Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Ändern Sie die Parser-Lambda-Funktion in Amazon Bedrock Agents
Jede Eingabeaufforderungsvorlage enthält eine Parser-Lambda-Funktion, die Sie ändern können. Um eine benutzerdefinierte Parser-Lambda-Funktion zu schreiben, müssen Sie das Eingabeereignis, das Ihr Agent sendet, und die Antwort, die der Agent als Ausgabe der Lambda-Funktion erwartet, verstehen. Sie schreiben eine Handler-Funktion, um Variablen aus dem Eingabeereignis zu bearbeiten und die Antwort zurückzugeben. Weitere Informationen zur AWS Lambda Funktionsweise finden Sie unter Event-driven Invocation im Developer Guide. AWS Lambda
Parser-Lambda-Eingabeereignis
Im Folgenden finden Sie die allgemeine Struktur des Eingabeereignisses des Agenten. Schreiben Sie Ihre Lambda-Handler-Funktion in die Felder.
{ "messageVersion": "1.0", "agent": { "name": "string", "id": "string", "alias": "string", "version": "string" }, "invokeModelRawResponse": "string", "promptType": "ORCHESTRATION | POST_PROCESSING | PRE_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION ", "overrideType": "OUTPUT_PARSER" }
In der folgenden Liste werden die Eingabeereignisfelder beschrieben:
-
messageVersion
: Die Version der Mitteilung, die das Format der Ereignisdaten, die in die Lambda-Funktion eingehen, und das erwartete Format der Antwort von der Lambda-Funktion identifiziert. Amazon Bedrock Agents unterstützt nur Version 1.0. -
agent
: Enthält Informationen über den Namen, die ID, den Alias und die Version des Agenten, dem die Eingabeaufforderung angehört. -
invokeModelRawResponse
: Die Rohausgabe des Basismodells der Eingabeaufforderung, deren Ausgabe analysiert werden soll. -
promptType
: Der Eingabeaufforderungstyp, deren Ausgabe analysiert werden soll. -
overrideType
: Die Artefakte, die diese Lambda-Funktion überschreibt. DerzeitOUTPUT_PARSER
wird nur unterstützt, was darauf hinweist, dass der Standardparser überschrieben werden soll.
Antwort der Lambda-Funktion
Ihr Agent erwartet eine Antwort von Ihrer Lambda-Funktion und verwendet die Antwort, um weitere Maßnahmen zu ergreifen oder ihm zu helfen, eine Antwort an den Benutzer zurückzugeben. Ihr Agent führt die nächste Aktion aus, die vom Modell des Agenten empfohlen wird. Die nächsten Aktionen können seriell oder parallel ausgeführt werden, abhängig vom Modell des Agenten und davon, wann der Agent erstellt und vorbereitet wurde.
Wenn Sie Ihren Agenten vor dem 4. Oktober 2024 erstellt und vorbereitet haben und wenn Ihr Agent Folgendes verwendet Anthropic Claude 3 Sonnet or Anthropic Claude 3.5 Sonnet Modelle: Standardmäßig wird die nächste wichtige Aktion, die vom Modell des Agenten empfohlen wird, seriell ausgeführt.
Wenn Sie nach dem 10. Oktober 2024 einen neuen Agenten erstellt oder einen vorhandenen Agenten vorbereitet haben und Ihr Agent verwendet Anthropic Claude 3 Sonnet, Anthropic Claude 3.5 Sonnet, oder irgendein non-Anthropic Modelle, die Aktionen für den nächsten Schritt, die vom Modell des Agenten empfohlen werden, werden parallel ausgeführt. Das bedeutet, dass mehrere Aktionen, beispielsweise eine Mischung aus Aktionsgruppen, Funktionen und Wissensdatenbanken, parallel ausgeführt werden. Dadurch wird die Anzahl der Aufrufe an das Modell reduziert, was die Gesamtlatenz reduziert.
Sie können parallel Aktionen für Ihre Agenten aktivieren, die vor dem 18. Oktober 2024 erstellt und vorbereitet wurden, indem Sie im Agent Builder Ihres Agenten in der Konsole anrufen PrepareAgentAPIoder Prepare auswählen. Nachdem der Agent vorbereitet ist, sehen Sie eine aktualisierte Eingabeaufforderungsvorlage und eine neue Version des Parser-Lambda-Schemas.
Beispiel für eine Lambda-Antwort eines Parsers
Im Folgenden finden Sie Beispiele für die allgemeine Struktur der Antwort eines Agenten, der die empfohlenen nächsten Aktionen nacheinander ausführt, und des Agenten, der die nächsten Aktionen parallel ausführt. Verwenden Sie die Antwortfelder der Lambda-Funktion, um zu konfigurieren, wie die Ausgabe zurückgegeben wird.
Beispiel für eine Antwort eines Agenten, der nacheinander die empfohlenen nächsten empfohlenen Aktionen ausführt
Wählen Sie die Registerkarte aus, die der Angabe entspricht, ob Sie die Aktionsgruppe mit einem definiert haben OpenAPI Schema oder mit Funktionsdetails:
Anmerkung
Das MessageVersion 1.0
gibt an, dass der Agent die nächsten empfohlenen Aktionen nacheinander ausführt.
- OpenAPI schema
-
{ "messageVersion": "1.0", "promptType": "ORCHESTRATION | PRE_PROCESSING | POST_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION", "preProcessingParsedResponse": { "isValidInput": "boolean", "rationale": "string" }, "orchestrationParsedResponse": { "rationale": "string", "parsingErrorDetails": { "repromptResponse": "string" }, "responseDetails": { "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER", "agentAskUser": { "responseText": "string", "id": "string" }, "actionGroupInvocation": { "actionGroupName": "string", "apiName": "string", "id": "string", "verb": "string", "actionGroupInput": { "
<parameter>
": { "value": "string" }, ... } }, "agentKnowledgeBase": { "knowledgeBaseId": "string", "id": "string", "searchQuery": { "value": "string" } }, "agentFinalResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{"sourceId": "string"}] }] } }, } }, "knowledgeBaseResponseGenerationParsedResponse": { "generatedResponse": { "generatedResponseParts": [ { "text": "string", "references": [ {"sourceId": "string"}, ... ] } ] } }, "postProcessingParsedResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{ "sourceId": "string" }] }] } } } - Function details
-
{ "messageVersion": "1.0", "promptType": "ORCHESTRATION | PRE_PROCESSING | POST_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION", "preProcessingParsedResponse": { "isValidInput": "boolean", "rationale": "string" }, "orchestrationParsedResponse": { "rationale": "string", "parsingErrorDetails": { "repromptResponse": "string" }, "responseDetails": { "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER", "agentAskUser": { "responseText": "string", "id": "string" }, "actionGroupInvocation": { "actionGroupName": "string", "functionName": "string", "id": "string", "actionGroupInput": { "
<parameter>
": { "value": "string" }, ... } }, "agentKnowledgeBase": { "knowledgeBaseId": "string", "id": "string", "searchQuery": { "value": "string" } }, "agentFinalResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{"sourceId": "string"}] }] } }, } }, "knowledgeBaseResponseGenerationParsedResponse": { "generatedResponse": { "generatedResponseParts": [ { "text": "string", "references": [ {"sourceId": "string"}, ... ] } ] } }, "postProcessingParsedResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{ "sourceId": "string" }] }] } } }
Beispielantwort eines Agenten, der die nächsten Aktionen parallel ausführt
Wählen Sie die Registerkarte aus, die der Angabe entspricht, ob Sie die Aktionsgruppe mit einem definiert haben OpenAPI Schema oder mit Funktionsdetails:
Anmerkung
Das MessageVersion 2.0
zeigt an, dass der Agent die nächsten empfohlenen Aktionen parallel ausführt
- OpenAPI schema
-
{ "messageVersion": "2.0", "promptType": "ORCHESTRATION | PRE_PROCESSING | POST_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION", "preProcessingParsedResponse": { "isValidInput": "boolean", "rationale": "string" }, "orchestrationParsedResponse": { "rationale": "string", "parsingErrorDetails": { "repromptResponse": "string" }, "responseDetails": { "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER", "agentAskUser": { "responseText": "string" }, "actionGroupInvocations": [ { "actionGroupName": "string", "apiName": "string", "verb": "string", "actionGroupInput": { "
<parameter>
": { "value": "string" }, ... } } ], "agentKnowledgeBases": [ { "knowledgeBaseId": "string", "searchQuery": { "value": "string" } } ], "agentFinalResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{"sourceId": "string"}] }] } }, } }, "knowledgeBaseResponseGenerationParsedResponse": { "generatedResponse": { "generatedResponseParts": [ { "text": "string", "references": [ {"sourceId": "string"}, ... ] } ] } }, "postProcessingParsedResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{ "sourceId": "string" }] }] } } } - Function details
-
{ "messageVersion": "2.0", "promptType": "ORCHESTRATION | PRE_PROCESSING | POST_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION", "preProcessingParsedResponse": { "isValidInput": "boolean", "rationale": "string" }, "orchestrationParsedResponse": { "rationale": "string", "parsingErrorDetails": { "repromptResponse": "string" }, "responseDetails": { "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER", "agentAskUser": { "responseText": "string" }, "actionGroupInvocations": [ { "actionGroupName": "string", "functionName": "string", "actionGroupInput": { "
<parameter>
"": { "value": "string" }, ... } } ], "agentKnowledgeBases": [ { "knowledgeBaseId": "string", "searchQuery": { "value": "string" } } ], "agentFinalResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{"sourceId": "string"}] }] } }, } }, "knowledgeBaseResponseGenerationParsedResponse": { "generatedResponse": { "generatedResponseParts": [ { "text": "string", "references": [ {"sourceId": "string"}, ... ] } ] } }, "postProcessingParsedResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{ "sourceId": "string" }] }] } } }
In der folgenden Liste werden die Lambda-Antwortfelder beschrieben:
-
messageVersion
: Die Version der Mitteilung, die das Format der Ereignisdaten, die in die Lambda-Funktion eingehen, und das erwartete Format der Antwort von einer Lambda-Funktion identifiziert. -
promptType
: Der Eingabeaufforderungstyp des aktuellen Abschnitts. -
preProcessingParsedResponse
: Die analysierte Antwort für den EingabeaufforderungstypPRE_PROCESSING
. -
orchestrationParsedResponse
: Die analysierte Antwort für den EingabeaufforderungstypORCHESTRATION
. Weitere Details finden Sie unten. -
knowledgeBaseResponseGenerationParsedResponse
: Die analysierte Antwort für den EingabeaufforderungstypKNOWLEDGE_BASE_RESPONSE_GENERATION
. -
postProcessingParsedResponse
: Die analysierte Antwort für den EingabeaufforderungstypPOST_PROCESSING
.
Weitere Informationen zu den analysierten Antworten für die vier Eingabeaufforderungsvorlagen finden Sie auf den folgenden Registerkarten.
- preProcessingParsedResponse
-
{ "isValidInput": "boolean", "rationale": "string" }
Die
preProcessingParsedResponse
enthält die folgenden Felder:-
isValidInput
: Gibt an, ob die Benutzereingabe gültig ist oder nicht. Sie können die Funktion definieren, um zu bestimmen, wie die Gültigkeit von Benutzereingaben charakterisiert werden soll. -
rationale
: Die Begründung für die Kategorisierung von Benutzereingaben. Diese Begründung liefert das Modell in der Rohantwort, die Lambda-Funktion analysiert es und der Agent präsentiert es zur Vorverarbeitung im Trace.
-
- orchestrationResponse
-
Das Format von
orchestrationResponse
hängt davon ab, ob Sie die Aktionsgruppe mit einem definiert haben OpenAPI Schema- oder Funktionsdetails:-
Wenn Sie die Aktionsgruppe mit einem definiert haben OpenAPI Schema, die Antwort muss das folgende Format haben:
{ "rationale": "string", "parsingErrorDetails": { "repromptResponse": "string" }, "responseDetails": { "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER", "agentAskUser": { "responseText": "string", "id": "string" }, "actionGroupInvocation": { "actionGroupName": "string", "apiName": "string", "id": "string", "verb": "string", "actionGroupInput": { "
<parameter>
": { "value": "string" }, ... } }, "agentKnowledgeBase": { "knowledgeBaseId": "string", "id": "string", "searchQuery": { "value": "string" } }, "agentFinalResponse": { "responseText": "string", "citations": { "generatedResponseParts": [ { "text": "string", "references": [ {"sourceId": "string"}, ... ] }, ... ] } }, } } -
Wenn Sie die Aktionsgruppe mit Funktionsdetails definiert haben, muss die Antwort das folgende Format haben:
{ "rationale": "string", "parsingErrorDetails": { "repromptResponse": "string" }, "responseDetails": { "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER", "agentAskUser": { "responseText": "string", "id": "string" }, "actionGroupInvocation": { "actionGroupName": "string", "functionName": "string", "id": "string", "actionGroupInput": { "
<parameter>
": { "value": "string" }, ... } }, "agentKnowledgeBase": { "knowledgeBaseId": "string", "id": "string", "searchQuery": { "value": "string" } }, "agentFinalResponse": { "responseText": "string", "citations": { "generatedResponseParts": [ { "text": "string", "references": [ {"sourceId": "string"}, ... ] }, ... ] } }, } }
Das
orchestrationParsedResponse
enthält die folgenden Felder:-
rationale
: Die Begründung für die nächsten Schritte basierend auf den Ergebnissen des Basismodells. Sie können die Funktion definieren, die anhand der Modellausgabe analysiert werden soll. -
parsingErrorDetails
: Enthält dierepromptResponse
, mit der das Modell erneut aufgefordert wird, seine Rohantwort zu aktualisieren, wenn die Modellantwort nicht analysiert werden kann. Sie können die Funktion definieren, um zu steuern, wie das Modell erneut aufgefordert wird. -
responseDetails
: Enthält die Einzelheiten zur Handhabung der Ausgabe des Basismodells. Enthält eineninvocationType
(der nächste Schritt, den der Agent ausführen muss) und ein zweites Feld, das deminvocationType
entsprechen sollte. Die folgenden Objekte sind möglich.-
agentAskUser
: Kompatibel mit demASK_USER
-Aufruftyp. Dieser Aufruftyp beendet den Orchestrierungsschritt. Enthält denresponseText
, mit dem die Benutzer um weitere Informationen gebeten werden. Sie können Ihre Funktion definieren, um dieses Feld zu bearbeiten. -
actionGroupInvocation
: Kompatibel mit demACTION_GROUP
-Aufruftyp. Sie können Ihre Lambda-Funktion so definieren, dass Aktionsgruppen aufgerufen und Parameter übergeben werden. Enthält die folgenden Felder:-
actionGroupName
: Die Aktionsgruppe, die aufgerufen werden soll. -
Die folgenden Felder sind erforderlich, wenn Sie die Aktionsgruppe mit einem definiert haben OpenAPI Schema:
-
apiName
— Der Name der API Operation, die in der Aktionsgruppe aufgerufen werden soll. -
verb
— Die Methode der API Operation, die verwendet werden soll.
-
-
Das folgende Feld ist erforderlich, wenn Sie die Aktionsgruppe mit Funktionsdetails definiert haben:
-
functionName
— Der Name der Funktion, die in der Aktionsgruppe aufgerufen werden soll.
-
-
actionGroupInput
— Enthält Parameter, die in der API Operationsanforderung angegeben werden müssen.
-
-
agentKnowledgeBase
: Kompatibel mit demKNOWLEDGE_BASE
-Aufruftyp. Sie können Ihre Funktion definieren, um zu bestimmen, wie Wissensdatenbanken abgefragt werden sollen. Enthält die folgenden Felder:-
knowledgeBaseId
: Die eindeutige Kennung der Wissensdatenbank. -
searchQuery
— Enthält die Abfrage, die an die Wissensdatenbank in demvalue
Feld gesendet werden soll.
-
-
agentFinalResponse
: Kompatibel mit demFINISH
-Aufruftyp. Dieser Aufruftyp beendet den Orchestrierungsschritt. Enthält die Antwort an den Benutzer im FeldresponseText
und Zitate für die Antwort im Objektcitations
.
-
-
- knowledgeBaseResponseGenerationParsedResponse
-
{ "generatedResponse": { "generatedResponseParts": [ { "text": "string", "references": [ { "sourceId": "string" }, ... ] }, ... ] } }
Das
knowledgeBaseResponseGenerationParsedResponse
enthält dasgeneratedResponse
Formular zum Abfragen der Wissensdatenbank und Verweise auf die Datenquellen. - postProcessingParsedResponse
-
{ "responseText": "string", "citations": { "generatedResponseParts": [ { "text": "string", "references": [ { "sourceId": "string" }, ... ] }, ... ] } }
Das
postProcessingParsedResponse
enthält die folgenden Felder:-
responseText
: Die Antwort, die an den Endbenutzer zurückgegeben werden soll. Sie können die Funktion zum Formatieren der Antwort definieren. -
citations
: Enthält eine Liste von Zitaten für die Antwort. Jedes Zitat zeigt den zitierten Text und seine Quellenangaben.
-
Beispiele für Parser-Lambda
Um Beispiele für Eingabeereignisse und Antworten der Lambda-Parser-Funktion zu sehen, wählen Sie eine der folgenden Registerkarten aus.
- Pre-processing
-
Beispiel für ein Eingabeereignis
{ "agent": { "alias": "TSTALIASID", "id": "AGENTID123", "name": "InsuranceAgent", "version": "DRAFT" }, "invokeModelRawResponse": " <thinking>\nThe user is asking about the instructions provided to the function calling agent. This input is trying to gather information about what functions/API's or instructions our function calling agent has access to. Based on the categories provided, this input belongs in Category B.\n</thinking>\n\n<category>B</category>", "messageVersion": "1.0", "overrideType": "OUTPUT_PARSER", "promptType": "PRE_PROCESSING" }
Beispielantwort
{ "promptType": "PRE_PROCESSING", "preProcessingParsedResponse": { "rationale": "\nThe user is asking about the instructions provided to the function calling agent. This input is trying to gather information about what functions/API's or instructions our function calling agent has access to. Based on the categories provided, this input belongs in Category B.\n", "isValidInput": false } }
- Orchestration
-
Beispiel für ein Eingabeereignis
{ "agent": { "alias": "TSTALIASID", "id": "AGENTID123", "name": "InsuranceAgent", "version": "DRAFT" }, "invokeModelRawResponse": "To answer this question, I will:\\n\\n1. Call the GET::x_amz_knowledgebase_KBID123456::Search function to search for a phone number to call.\\n\\nI have checked that I have access to the GET::x_amz_knowledgebase_KBID23456::Search function.\\n\\n</scratchpad>\\n\\n<function_call>GET::x_amz_knowledgebase_KBID123456::Search(searchQuery=\"What is the phone number I can call?\)", "messageVersion": "1.0", "overrideType": "OUTPUT_PARSER", "promptType": "ORCHESTRATION" }
Beispielantwort
{ "promptType": "ORCHESTRATION", "orchestrationParsedResponse": { "rationale": "To answer this question, I will:\\n\\n1. Call the GET::x_amz_knowledgebase_KBID123456::Search function to search for a phone number to call Farmers.\\n\\nI have checked that I have access to the GET::x_amz_knowledgebase_KBID123456::Search function.", "responseDetails": { "invocationType": "KNOWLEDGE_BASE", "agentKnowledgeBase": { "searchQuery": { "value": "What is the phone number I can call?" }, "knowledgeBaseId": "KBID123456" } } } }
- Knowledge base response generation
-
Beispiel für ein Eingabeereignis
{ "agent": { "alias": "TSTALIASID", "id": "AGENTID123", "name": "InsuranceAgent", "version": "DRAFT" }, "invokeModelRawResponse": "{\"completion\":\" <answer>\\\\n<answer_part>\\\\n<text>\\\\nThe search results contain information about different types of insurance benefits, including personal injury protection (PIP), medical payments coverage, and lost wages coverage. PIP typically covers reasonable medical expenses for injuries caused by an accident, as well as income continuation, child care, loss of services, and funerals. Medical payments coverage provides payment for medical treatment resulting from a car accident. Who pays lost wages due to injuries depends on the laws in your state and the coverage purchased.\\\\n</text>\\\\n<sources>\\\\n<source>1234567-1234-1234-1234-123456789abc</source>\\\\n<source>2345678-2345-2345-2345-23456789abcd</source>\\\\n<source>3456789-3456-3456-3456-3456789abcde</source>\\\\n</sources>\\\\n</answer_part>\\\\n</answer>\",\"stop_reason\":\"stop_sequence\",\"stop\":\"\\\\n\\\\nHuman:\"}", "messageVersion": "1.0", "overrideType": "OUTPUT_PARSER", "promptType": "KNOWLEDGE_BASE_RESPONSE_GENERATION" }
Beispielantwort
{ "promptType": "KNOWLEDGE_BASE_RESPONSE_GENERATION", "knowledgeBaseResponseGenerationParsedResponse": { "generatedResponse": { "generatedResponseParts": [ { "text": "\\\\nThe search results contain information about different types of insurance benefits, including personal injury protection (PIP), medical payments coverage, and lost wages coverage. PIP typically covers reasonable medical expenses for injuries caused by an accident, as well as income continuation, child care, loss of services, and funerals. Medical payments coverage provides payment for medical treatment resulting from a car accident. Who pays lost wages due to injuries depends on the laws in your state and the coverage purchased.\\\\n", "references": [ {"sourceId": "1234567-1234-1234-1234-123456789abc"}, {"sourceId": "2345678-2345-2345-2345-23456789abcd"}, {"sourceId": "3456789-3456-3456-3456-3456789abcde"} ] } ] } } }
- Post-processing
-
Beispiel für ein Eingabeereignis
{ "agent": { "alias": "TSTALIASID", "id": "AGENTID123", "name": "InsuranceAgent", "version": "DRAFT" }, "invokeModelRawResponse": "<final_response>\\nBased on your request, I searched our insurance benefit information database for details. The search results indicate that insurance policies may cover different types of benefits, depending on the policy and state laws. Specifically, the results discussed personal injury protection (PIP) coverage, which typically covers medical expenses for insured individuals injured in an accident (cited sources: 1234567-1234-1234-1234-123456789abc, 2345678-2345-2345-2345-23456789abcd). PIP may pay for costs like medical care, lost income replacement, childcare expenses, and funeral costs. Medical payments coverage was also mentioned as another option that similarly covers medical treatment costs for the policyholder and others injured in a vehicle accident involving the insured vehicle. The search results further noted that whether lost wages are covered depends on the state and coverage purchased. Please let me know if you need any clarification or have additional questions.\\n</final_response>", "messageVersion": "1.0", "overrideType": "OUTPUT_PARSER", "promptType": "POST_PROCESSING" }
Beispielantwort
{ "promptType": "POST_PROCESSING", "postProcessingParsedResponse": { "responseText": "Based on your request, I searched our insurance benefit information database for details. The search results indicate that insurance policies may cover different types of benefits, depending on the policy and state laws. Specifically, the results discussed personal injury protection (PIP) coverage, which typically covers medical expenses for insured individuals injured in an accident (cited sources: 24c62d8c-3e39-4ca1-9470-a91d641fe050, 197815ef-8798-4cb1-8aa5-35f5d6b28365). PIP may pay for costs like medical care, lost income replacement, childcare expenses, and funeral costs. Medical payments coverage was also mentioned as another option that similarly covers medical treatment costs for the policyholder and others injured in a vehicle accident involving the insured vehicle. The search results further noted that whether lost wages are covered depends on the state and coverage purchased. Please let me know if you need any clarification or have additional questions." } }
Um Lambda-Beispielfunktionen für Parser zu sehen, erweitern Sie den Abschnitt mit den Beispielen für Prompt-Vorlagen, die Sie sehen möchten. Die lambda_handler
-Funktion gibt die analysierte Antwort an den Agenten zurück.
Das folgende Beispiel zeigt eine Lambda-Funktion für die Vorverarbeitung des Parsers, in die geschrieben wurde Python.
import json import re import logging PRE_PROCESSING_RATIONALE_REGEX = "<thinking>(.*?)</thinking>" PREPROCESSING_CATEGORY_REGEX = "<category>(.*?)</category>" PREPROCESSING_PROMPT_TYPE = "PRE_PROCESSING" PRE_PROCESSING_RATIONALE_PATTERN = re.compile(PRE_PROCESSING_RATIONALE_REGEX, re.DOTALL) PREPROCESSING_CATEGORY_PATTERN = re.compile(PREPROCESSING_CATEGORY_REGEX, re.DOTALL) logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default PreProcessing prompt def lambda_handler(event, context): print("Lambda input: " + str(event)) logger.info("Lambda input: " + str(event)) prompt_type = event["promptType"] # Sanitize LLM response model_response = sanitize_response(event['invokeModelRawResponse']) if event["promptType"] == PREPROCESSING_PROMPT_TYPE: return parse_pre_processing(model_response) def parse_pre_processing(model_response): category_matches = re.finditer(PREPROCESSING_CATEGORY_PATTERN, model_response) rationale_matches = re.finditer(PRE_PROCESSING_RATIONALE_PATTERN, model_response) category = next((match.group(1) for match in category_matches), None) rationale = next((match.group(1) for match in rationale_matches), None) return { "promptType": "PRE_PROCESSING", "preProcessingParsedResponse": { "rationale": rationale, "isValidInput": get_is_valid_input(category) } } def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def get_is_valid_input(category): if category is not None and category.strip().upper() == "D" or category.strip().upper() == "E": return True return False
Die folgenden Beispiele zeigen eine Lambda-Funktion für Orchestrierungsparser, in die geschrieben wurde. Python.
Der Beispielcode unterscheidet sich je nachdem, ob Ihre Aktionsgruppe mit einem definiert wurde OpenAPI Schema oder mit Funktionsdetails:
-
Um Beispiele für eine Aktionsgruppe zu sehen, die mit einem definiert ist OpenAPI Schema: Wählen Sie die Registerkarte aus, die dem Modell entspricht, für das Sie sich Beispiele ansehen möchten.
- Anthropic Claude 2.0
-
import json import re import logging RATIONALE_REGEX_LIST = [ "(.*?)(<function_call>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_call>" ASK_USER_FUNCTION_CALL_REGEX = r"(<function_call>user::askuser)(.*)\)" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_FUNCTION_PARAMETER_REGEX = r"(?<=askuser=\")(.*?)\"" ASK_USER_FUNCTION_PARAMETER_PATTERN = re.compile(ASK_USER_FUNCTION_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"<function_call>(\w+)::(\w+)::(.+)\((.+)\)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the argument askuser for user::askuser function call. Please try again with the correct argument added" ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <function_call>user::askuser(askuser=\"$ASK_USER_INPUT\")</function_call>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = 'The function call format is incorrect. The format for function calls must be: <function_call>$FUNCTION_NAME($FUNCTION_ARGUMENT_NAME=""$FUNCTION_ARGUMENT_NAME"")</function_call>.' logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next((pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next((pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: ask_user = ask_user_matcher.group(2).strip() ask_user_question_matcher = ASK_USER_FUNCTION_PARAMETER_PATTERN.search(ask_user) if ask_user_question_matcher: return ask_user_question_matcher.group(1).strip() raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) verb, resource_name, function = match.group(1), match.group(2), match.group(3) parameters = {} for arg in match.group(4).split(","): key, value = arg.split("=") parameters[key.strip()] = {'value': value.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 2.1
-
import logging import re import xml.etree.ElementTree as ET RATIONALE_REGEX_LIST = [ "(.*?)(<function_calls>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_calls>" ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>" ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL) TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>" TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL) ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>" ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added." ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next( (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response) params = parameters_matches.group(1).strip() ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params) if ask_user_question_matcher: ask_user_question = ask_user_question_matcher.group(1) return ask_user_question raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response) tool_name = tool_name_matches.group(1) parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response) params = parameters_matches.group(1).strip() action_split = tool_name.split('::') verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params))) parameters = {} for elem in xml_tree.iter(): if elem.text: parameters[elem.tag] = {'value': elem.text.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 3
-
import logging import re import xml.etree.ElementTree as ET RATIONALE_REGEX_LIST = [ "(.*?)(<function_calls>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<thinking>(.*?)(</thinking>)", "(.*?)(</thinking>)", "(<thinking>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_calls>" ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>" ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL) TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>" TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL) ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>" ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added." ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next( (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response) params = parameters_matches.group(1).strip() ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params) if ask_user_question_matcher: ask_user_question = ask_user_question_matcher.group(1) return ask_user_question raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response) tool_name = tool_name_matches.group(1) parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response) params = parameters_matches.group(1).strip() action_split = tool_name.split('::') verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params))) parameters = {} for elem in xml_tree.iter(): if elem.text: parameters[elem.tag] = {'value': elem.text.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 3.5
-
import json import logging import re from collections import defaultdict RATIONALE_VALUE_REGEX_LIST = [ "<thinking>(.*?)(</thinking>)", "(.*?)(</thinking>)", "(<thinking>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" ASK_USER = "user__askuser" KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user__askuser function call. Please try again with the correct argument added." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The tool name format is incorrect. The format for the tool name must be: 'httpVerb__actionGroupName__apiName." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.setLevel("INFO") logger.info("Lambda input: " + str(event)) # Sanitize LLM response response = load_response(event['invokeModelRawResponse']) stop_reason = response["stop_reason"] content = response["content"] content_by_type = get_content_by_type(content) # Parse LLM response for any rationale rationale = parse_rationale(content_by_type) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } match stop_reason: case 'tool_use': # Check if there is an ask user try: ask_user = parse_ask_user(content_by_type) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user, 'id': content_by_type['tool_use'][0]['id'] }, } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(content_by_type, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response case 'end_turn' | 'stop_sequence': # Check if there is a final answer try: if content_by_type["text"]: text_contents = content_by_type["text"] for text_content in text_contents: final_answer, generated_response_parts = parse_answer(text_content) if final_answer: parsed_response['orchestrationParsedResponse'][ 'responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails'][ 'agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response case _: addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response def load_response(text): raw_text = r'{}'.format(text) json_text = json.loads(raw_text) return json_text def get_content_by_type(content): content_by_type = defaultdict(list) for content_value in content: content_by_type[content_value["type"]].append(content_value) return content_by_type def parse_rationale(content_by_type): if "text" in content_by_type: rationale = content_by_type["text"][0]["text"] if rationale is not None: rationale_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() return rationale return None def parse_answer(response): if has_generated_response(response["text"].strip()): return parse_generated_response(response) answer_match = ANSWER_PATTERN.search(response["text"].strip()) if answer_match: return answer_match.group(0).strip(), None return None, None def parse_generated_response(response): results = [] for match in ANSWER_PART_PATTERN.finditer(response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(content_by_type): try: if content_by_type["tool_use"][0]["name"] == ASK_USER: ask_user_question = content_by_type["tool_use"][0]["input"]["question"] if not ask_user_question: raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) return ask_user_question except ValueError as ex: raise ex return None def parse_function_call(content_by_type, parsed_response): try: content = content_by_type["tool_use"][0] tool_name = content["name"] action_split = tool_name.split('__') verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() except ValueError as ex: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) parameters = {} for param, value in content["input"].items(): parameters[param] = {'value': value} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails'][ 'invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails'][ 'agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace( KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, ''), 'id': content["id"] } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails'][ 'invocationType'] = 'ACTION_GROUP' parsed_response['orchestrationParsedResponse']['responseDetails'][ 'actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters, "id": content["id"] } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
-
Um Beispiele für eine Aktionsgruppe anzuzeigen, die mit Funktionsdetails definiert wurde, wählen Sie die Registerkarte aus, die dem Modell entspricht, für das Sie Beispiele anzeigen möchten.
- Anthropic Claude 2.0
-
import json import re import logging RATIONALE_REGEX_LIST = [ "(.*?)(<function_call>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_call>" ASK_USER_FUNCTION_CALL_REGEX = r"(<function_call>user::askuser)(.*)\)" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_FUNCTION_PARAMETER_REGEX = r"(?<=askuser=\")(.*?)\"" ASK_USER_FUNCTION_PARAMETER_PATTERN = re.compile(ASK_USER_FUNCTION_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX_API_SCHEMA = r"<function_call>(\w+)::(\w+)::(.+)\((.+)\)" FUNCTION_CALL_REGEX_FUNCTION_SCHEMA = r"<function_call>(\w+)::(.+)\((.+)\)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the argument askuser for user::askuser function call. Please try again with the correct argument added" ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <function_call>user::askuser(askuser=\"$ASK_USER_INPUT\")</function_call>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = 'The function call format is incorrect. The format for function calls must be: <function_call>$FUNCTION_NAME($FUNCTION_ARGUMENT_NAME=""$FUNCTION_ARGUMENT_NAME"")</function_call>.' logger = logging.getLogger() logger.setLevel("INFO") # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next((pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next((pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: ask_user = ask_user_matcher.group(2).strip() ask_user_question_matcher = ASK_USER_FUNCTION_PARAMETER_PATTERN.search(ask_user) if ask_user_question_matcher: return ask_user_question_matcher.group(1).strip() raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX_API_SCHEMA, sanitized_response) match_function_schema = re.search(FUNCTION_CALL_REGEX_FUNCTION_SCHEMA, sanitized_response) if not match and not match_function_schema: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) if match: schema_type = 'API' verb, resource_name, function, param_arg = match.group(1), match.group(2), match.group(3), match.group(4) else: schema_type = 'FUNCTION' resource_name, function, param_arg = match_function_schema.group(1), match_function_schema.group(2), match_function_schema.group(3) parameters = {} for arg in param_arg.split(","): key, value = arg.split("=") parameters[key.strip()] = {'value': value.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' if schema_type == 'API': parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } else: parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "actionGroupName": resource_name, "functionName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 2.1
-
import logging import re import xml.etree.ElementTree as ET RATIONALE_REGEX_LIST = [ "(.*?)(<function_calls>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_calls>" ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>" ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL) TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>" TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL) ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>" ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added." ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>." logger = logging.getLogger() logger.setLevel("INFO") # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next( (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response) params = parameters_matches.group(1).strip() ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params) if ask_user_question_matcher: ask_user_question = ask_user_question_matcher.group(1) return ask_user_question raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response) tool_name = tool_name_matches.group(1) parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response) params = parameters_matches.group(1).strip() action_split = tool_name.split('::') schema_type = 'FUNCTION' if len(action_split) == 2 else 'API' if schema_type == 'API': verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() else: resource_name = action_split[0].strip() function = action_split[1].strip() xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params))) parameters = {} for elem in xml_tree.iter(): if elem.text: parameters[elem.tag] = {'value': elem.text.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' if schema_type == 'API': parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } else: parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "actionGroupName": resource_name, "functionName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 3
-
import logging import re import xml.etree.ElementTree as ET RATIONALE_REGEX_LIST = [ "(.*?)(<function_calls>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<thinking>(.*?)(</thinking>)", "(.*?)(</thinking>)", "(<thinking>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_calls>" ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>" ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL) TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>" TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL) ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>" ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added." ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next( (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response) params = parameters_matches.group(1).strip() ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params) if ask_user_question_matcher: ask_user_question = ask_user_question_matcher.group(1) return ask_user_question raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response) tool_name = tool_name_matches.group(1) parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response) params = parameters_matches.group(1).strip() action_split = tool_name.split('::') schema_type = 'FUNCTION' if len(action_split) == 2 else 'API' if schema_type == 'API': verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() else: resource_name = action_split[0].strip() function = action_split[1].strip() xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params))) parameters = {} for elem in xml_tree.iter(): if elem.text: parameters[elem.tag] = {'value': elem.text.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' if schema_type == 'API': parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } else: parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "actionGroupName": resource_name, "functionName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 3.5
-
import json import logging import re from collections import defaultdict RATIONALE_VALUE_REGEX_LIST = [ "<thinking>(.*?)(</thinking>)", "(.*?)(</thinking>)", "(<thinking>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" ASK_USER = "user__askuser" KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user__askuser function call. Please try again with the correct argument added." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The tool name format is incorrect. The format for the tool name must be: 'httpVerb__actionGroupName__apiName." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.setLevel("INFO") logger.info("Lambda input: " + str(event)) # Sanitize LLM response response = load_response(event['invokeModelRawResponse']) stop_reason = response["stop_reason"] content = response["content"] content_by_type = get_content_by_type(content) # Parse LLM response for any rationale rationale = parse_rationale(content_by_type) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } match stop_reason: case 'tool_use': # Check if there is an ask user try: ask_user = parse_ask_user(content_by_type) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user, 'id': content_by_type['tool_use'][0]['id'] }, } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(content_by_type, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response case 'end_turn' | 'stop_sequence': # Check if there is a final answer try: if content_by_type["text"]: text_contents = content_by_type["text"] for text_content in text_contents: final_answer, generated_response_parts = parse_answer(text_content) if final_answer: parsed_response['orchestrationParsedResponse'][ 'responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails'][ 'agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response case _: addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response def load_response(text): raw_text = r'{}'.format(text) json_text = json.loads(raw_text) return json_text def get_content_by_type(content): content_by_type = defaultdict(list) for content_value in content: content_by_type[content_value["type"]].append(content_value) return content_by_type def parse_rationale(content_by_type): if "text" in content_by_type: rationale = content_by_type["text"][0]["text"] if rationale is not None: rationale_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() return rationale return None def parse_answer(response): if has_generated_response(response["text"].strip()): return parse_generated_response(response) answer_match = ANSWER_PATTERN.search(response["text"].strip()) if answer_match: return answer_match.group(0).strip(), None return None, None def parse_generated_response(response): results = [] for match in ANSWER_PART_PATTERN.finditer(response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(content_by_type): try: if content_by_type["tool_use"][0]["name"] == ASK_USER: ask_user_question = content_by_type["tool_use"][0]["input"]["question"] if not ask_user_question: raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) return ask_user_question except ValueError as ex: raise ex return None def parse_function_call(content_by_type, parsed_response): try: content = content_by_type["tool_use"][0] tool_name = content["name"] action_split = tool_name.split('__') schema_type = 'FUNCTION' if len(action_split) == 2 else 'API' if schema_type == 'API': verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() else: resource_name = action_split[1].strip() function = action_split[2].strip() except ValueError as ex: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) parameters = {} for param, value in content["input"].items(): parameters[param] = {'value': value} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails'][ 'invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails'][ 'agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace( KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, ''), 'id': content["id"] } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails'][ 'invocationType'] = 'ACTION_GROUP' if schema_type == 'API': parsed_response['orchestrationParsedResponse']['responseDetails'][ 'actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters, "id": content["id"] } else: parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "actionGroupName": resource_name, "functionName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
Das folgende Beispiel zeigt eine Lambda-Funktion zur Generierung von Antworten in der Wissensdatenbank, in die geschrieben wurde. Python.
import json import re import logging ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default KB response generation prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) raw_response = event['invokeModelRawResponse'] parsed_response = { 'promptType': 'KNOWLEDGE_BASE_RESPONSE_GENERATION', 'knowledgeBaseResponseGenerationParsedResponse': { 'generatedResponse': parse_generated_response(raw_response) } } logger.info(parsed_response) return parsed_response def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return { 'generatedResponseParts': generated_response_parts } def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references
Das folgende Beispiel zeigt eine Lambda-Funktion für die Nachbearbeitung des Parsers, in die geschrieben wurde Python.
import json import re import logging FINAL_RESPONSE_REGEX = r"<final_response>([\s\S]*?)</final_response>" FINAL_RESPONSE_PATTERN = re.compile(FINAL_RESPONSE_REGEX, re.DOTALL) logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default PostProcessing prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) raw_response = event['invokeModelRawResponse'] parsed_response = { 'promptType': 'POST_PROCESSING', 'postProcessingParsedResponse': {} } matcher = FINAL_RESPONSE_PATTERN.search(raw_response) if not matcher: raise Exception("Could not parse raw LLM output") response_text = matcher.group(1).strip() parsed_response['postProcessingParsedResponse']['responseText'] = response_text logger.info(parsed_response) return parsed_response