

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Écriture d’une fonction Lambda d’analyseur personnalisée dans Amazon Bedrock Agents
<a name="lambda-parser"></a>

Chaque modèle d’invite inclut une fonction Lambda d’analyseur. Vous pouvez écrire votre fonction Lambda d’analyseur personnalisée et spécifier les modèles dont vous souhaitez remplacer la fonction d’analyseur par défaut. Afin d’écrire une fonction Lambda d’analyseur personnalisée, vous devez comprendre l’événement d’entrée que votre agent envoie et la réponse qu’il attend en sortie de la fonction Lambda. Vous devez écrire une fonction de gestionnaire pour manipuler les variables de l’événement d’entrée et renvoyer la réponse. Pour plus d'informations sur le AWS Lambda fonctionnement, consultez la section [Invocation pilotée par des événements](https://docs.aws.amazon.com/lambda/latest/dg/lambda-services.html#event-driven-invocation) dans le Guide du AWS Lambda développeur.

**Topics**
+ [Événement d’entrée Lambda de l’analyseur](#lambda-parser-input)
+ [Réponse Lambda de l’analyseur](#lambda-parser-response)
+ [Exemples de fonctions Lambda d’analyseur](#lambda-parser-example)

## Événement d’entrée Lambda de l’analyseur
<a name="lambda-parser-input"></a>

Voici la structure générale de l’événement d’entrée de l’agent. Écrivez la fonction de gestionnaire Lambda à l’aide des champs.

```
{
    "messageVersion": "1.0",
    "agent": {
        "name": "string",
        "id": "string",
        "alias": "string",
        "version": "string"
    },
    "invokeModelRawResponse": "string",
    "promptType": "ORCHESTRATION | ROUTING_CLASSIFIER | POST_PROCESSING | PRE_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION | MEMORY_SUMMARIZATION",
    "overrideType": "OUTPUT_PARSER"
}
```

La liste suivante décrit les champs d’événement d’entrée :
+ `messageVersion` : version du message identifiant le format des données d’événement entrant dans la fonction Lambda et le format attendu de la réponse provenant de la fonction Lambda. Amazon Bedrock Agents ne prend en charge que la version 1.0.
+ `agent` : contient des informations sur le nom, l’identifiant, l’alias et la version de l’agent auquel appartiennent les invites.
+ `invokeModelRawResponse` : sortie brute du modèle de fondation de l’invite dont la sortie doit être analysée.
+ `promptType` : type d’invite dont la sortie doit être analysée.
+ `overrideType` : artefacts que cette fonction Lambda remplace. À l’heure actuelle, seul `OUTPUT_PARSER` est pris en charge, ce qui indique que l’analyseur par défaut doit être remplacé.

## Réponse Lambda de l’analyseur
<a name="lambda-parser-response"></a>

Votre agent attend une réponse de votre fonction Lambda, puis cette réponse lui permet d’effectuer des actions plus poussées ou de renvoyer une réponse à l’utilisateur. Votre agent exécute l’action suivante recommandée par le modèle de l’agent. Les actions suivantes peuvent être exécutées dans un ordre séquentiel ou en parallèle en fonction du modèle de l’agent et de la date de création et de préparation de l’agent. 

Si vous avez créé et préparé votre agent *avant le 4 octobre 2024* et s’il utilise le modèle Anthropic Claude 3 Sonnet ou Anthropic Claude 3.5 Sonnet, par défaut, l’action suivante recommandée par le modèle de l’agent est exécutée par ordre séquentiel. 

Si vous avez créé un agent ou préparé un agent existant *après le 10 octobre 2024* et qu’il utilise Anthropic Claude 3 Sonnet, Anthropic Claude 3.5 Sonnet ou tout modèle non-Anthropic, les actions suivantes recommandées par le modèle de l’agent s’exécutent en parallèle. Autrement dit, plusieurs actions, par exemple un mélange de groupes d’actions, de fonctions et de bases de connaissances, sont exécutées en parallèle. Cette approche réduit le nombre d’appels vers le modèle, ce qui réduit la latence globale.

Vous pouvez activer les actions parallèles pour vos agents créés et préparés *avant le 4 octobre 2024* en appelant [PrepareAgent](https://docs.aws.amazon.com//bedrock/latest/APIReference/API_agent_PrepareAgent.html)l'API ou en sélectionnant **Prepare** dans le générateur d'agents de votre agent dans la console. Une fois l’agent préparé, vous voyez un modèle d’invite mis à jour et une nouvelle version du schéma Lambda de l’analyseur. 

**Exemple de réponse Lambda de l’analyseur**

Voici des exemples de la structure générale de la réponse de l’agent exécutant les actions suivantes recommandées par ordre séquentiel et de l’agent exécutant les actions suivantes en parallèle. Configurez la manière dont la sortie est renvoyée à l’aide des champs de réponse de la fonction Lambda.

**Exemple de réponse d’un agent exécutant les actions suivantes recommandées par ordre séquentiel**

Cliquez sur l’onglet correspondant à la définition ou non du groupe d’actions avec un schéma OpenAPI ou des détails de fonction :

**Note**  
`MessageVersion 1.0` indique que l’agent exécute les actions suivantes recommandées par ordre séquentiel. 

------
#### [ OpenAPI schema ]

```
{
    "messageVersion": "1.0",
    "promptType": "ORCHESTRATION | PRE_PROCESSING | ROUTING_CLASSIFIER | POST_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION",
    "preProcessingParsedResponse": {
        "isValidInput": "boolean",
        "rationale": "string"
    },
    "orchestrationParsedResponse": {
        "rationale": "string",
        "parsingErrorDetails": {
            "repromptResponse": "string"
        },
        "responseDetails": {
            "invocationType": "AGENT_COLLABORATOR | ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER",
            "agentAskUser": {
                "responseText": "string",
                "id": "string"
            },
             "agentCollaboratorInvocation": {
                "agentCollaboratorName": "string",
                "input": {
                    "text": "string"                    
                }
            }
            ...
        }
    },
    "routingClassifierParsedResponse": {
        "parsingErrorDetails": {
            "repromptResponse": "string"
        },
        "responseDetails": {
            "type": "AGENT | LAST_AGENT | UNDECIDED",
            "agentCollaboratorInvocation": {
                "agentCollaboratorName": "string",
                "input": {
                    "text": "string"                    
                    }
            }
        }
    }
}
            "actionGroupInvocation": {
                "actionGroupName": "string",
                "apiName": "string",
                "id": "string",
                "verb": "string",
                "actionGroupInput": {
                    "<parameter>": {
                        "value": "string"
                    },
                    ...
                }
            },
            "agentKnowledgeBase": {
                "knowledgeBaseId": "string",
                "id": "string",
                "searchQuery": {
                    "value": "string"
                }
            },
            "agentFinalResponse": {
                "responseText": "string",
                "citations": {
                    "generatedResponseParts": [{
                        "text": "string",
                        "references": [{"sourceId": "string"}]
                    }]
                }
            },
        }
    },
    "knowledgeBaseResponseGenerationParsedResponse": { 
       "generatedResponse": {
            "generatedResponseParts": [
                {
                    "text": "string",
                    "references": [
                        {"sourceId": "string"},
                        ...
                    ]
                }
            ]
        }
    },
    "postProcessingParsedResponse": {
        "responseText": "string",
        "citations": {
            "generatedResponseParts": [{
                "text": "string",
                "references": [{
                    "sourceId": "string"
                }]
            }]
        }
    }
}
```

------
#### [ Function details ]

```
{
    "messageVersion": "1.0",
    "promptType": "ORCHESTRATION | PRE_PROCESSING | POST_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION",
    "preProcessingParsedResponse": {
        "isValidInput": "boolean",
        "rationale": "string"
    },
    "orchestrationParsedResponse": {
        "rationale": "string",
        "parsingErrorDetails": {
            "repromptResponse": "string"
        },
        "responseDetails": {
            "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER",
            "agentAskUser": {
                "responseText": "string",
                "id": "string"
            },
            "actionGroupInvocation": {
                "actionGroupName": "string",
                "functionName": "string",
                "id": "string",
                "actionGroupInput": {
                    "<parameter>": {
                        "value": "string"
                    },
                    ...
                }
            },
            "agentKnowledgeBase": {
                "knowledgeBaseId": "string",
                "id": "string",
                "searchQuery": {
                    "value": "string"
                }
            },
            "agentFinalResponse": {
                "responseText": "string",
                "citations": {
                    "generatedResponseParts": [{
                        "text": "string",
                        "references": [{"sourceId": "string"}]
                    }]
                }
            },
        }
    },
    "knowledgeBaseResponseGenerationParsedResponse": { 
       "generatedResponse": {
            "generatedResponseParts": [
                {
                    "text": "string",
                    "references": [
                        {"sourceId": "string"},
                        ...
                    ]
                }
            ]
        }
    },
    "postProcessingParsedResponse": {
        "responseText": "string",
        "citations": {
            "generatedResponseParts": [{
                "text": "string",
                "references": [{
                    "sourceId": "string"
                }]
            }]
        }
    }
}
```

------

**Exemple de réponse d’un agent exécutant les actions suivantes en parallèle**

Cliquez sur l’onglet correspondant à la définition ou non du groupe d’actions avec un schéma OpenAPI ou des détails de fonction :

**Note**  
`MessageVersion 2.0` indique que l’agent exécute les actions suivantes recommandées en parallèle. 

------
#### [ OpenAPI schema ]

```
{
    "messageVersion": "2.0",
    "promptType": "ORCHESTRATION | PRE_PROCESSING | POST_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION",
    "preProcessingParsedResponse": {
        "isValidInput": "boolean",
        "rationale": "string"
    },
    "orchestrationParsedResponse": {
        "rationale": "string",
        "parsingErrorDetails": {
            "repromptResponse": "string"
        },
        "responseDetails": {
            "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER",
            "agentAskUser": {
                "responseText": "string"
            },
            "actionGroupInvocations": [
                {
                    "actionGroupName": "string",
                    "apiName": "string",
                    "verb": "string",
                    "actionGroupInput": {
                        "<parameter>": {
                            "value": "string"
                        },
                        ...
                    }
                }
            ],
            "agentKnowledgeBases": [
                {
                    "knowledgeBaseId": "string",
                    "searchQuery": {
                        "value": "string"
                    }
                }
            ],
            "agentFinalResponse": {
                "responseText": "string",
                "citations": {
                    "generatedResponseParts": [{
                        "text": "string",
                        "references": [{"sourceId": "string"}]
                    }]
                }
            },
        }
    },
    "knowledgeBaseResponseGenerationParsedResponse": { 
       "generatedResponse": {
            "generatedResponseParts": [
                {
                    "text": "string",
                    "references": [
                        {"sourceId": "string"},
                        ...
                    ]
                }
            ]
        }
    },
    "postProcessingParsedResponse": {
        "responseText": "string",
        "citations": {
            "generatedResponseParts": [{
                "text": "string",
                "references": [{
                    "sourceId": "string"
                }]
            }]
        }
    }
}
```

------
#### [ Function details ]

```
{
    "messageVersion": "2.0",
    "promptType": "ORCHESTRATION | PRE_PROCESSING | POST_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION",
    "preProcessingParsedResponse": {
        "isValidInput": "boolean",
        "rationale": "string"
    },
    "orchestrationParsedResponse": {
        "rationale": "string",
        "parsingErrorDetails": {
            "repromptResponse": "string"
        },
        "responseDetails": {
            "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER",
            "agentAskUser": {
                "responseText": "string"
            },
            "actionGroupInvocations": [
                {
                    "actionGroupName": "string",
                    "functionName": "string",
                    "actionGroupInput": {
                        "<parameter>"": {
                            "value": "string"
                        },
                        ...
                    }
                }
            ],
            "agentKnowledgeBases": [
                {
                    "knowledgeBaseId": "string",
                    "searchQuery": {
                        "value": "string"
                    }
                }
            ],
            "agentFinalResponse": {
                "responseText": "string",
                "citations": {
                    "generatedResponseParts": [{
                        "text": "string",
                        "references": [{"sourceId": "string"}]
                    }]
                }
            },
        }
    },
    "knowledgeBaseResponseGenerationParsedResponse": { 
       "generatedResponse": {
            "generatedResponseParts": [
                {
                    "text": "string",
                    "references": [
                        {"sourceId": "string"},
                        ...
                    ]
                }
            ]
        }
    },
    "postProcessingParsedResponse": {
        "responseText": "string",
        "citations": {
            "generatedResponseParts": [{
                "text": "string",
                "references": [{
                    "sourceId": "string"
                }]
            }]
        }
    }
}
```

------

La liste suivante décrit les champs de réponse Lambda :
+ `messageVersion` : version du message identifiant le format des données d’événement entrant dans la fonction Lambda et le format attendu de la réponse provenant d’une fonction Lambda. 
+ `promptType` : type d’invite du tour en cours.
+ `preProcessingParsedResponse` : réponse analysée pour le type d’invite `PRE_PROCESSING`.
+ `orchestrationParsedResponse` : réponse analysée pour le type d’invite `ORCHESTRATION`. Voir ci-dessous pour plus d’informations.
+ `knowledgeBaseResponseGenerationParsedResponse` : réponse analysée pour le type d’invite `KNOWLEDGE_BASE_RESPONSE_GENERATION`.
+ `postProcessingParsedResponse` : réponse analysée pour le type d’invite `POST_PROCESSING`.

Pour plus de détails sur les réponses analysées pour les quatre modèles d’invite, consultez les onglets suivants.

------
#### [ preProcessingParsedResponse ]

```
{
    "isValidInput": "boolean",
    "rationale": "string"
}
```

`preProcessingParsedResponse` contient les champs suivants.
+ `isValidInput` : spécifie si l’entrée utilisateur est valide ou non. Vous pouvez définir la fonction pour déterminer comment caractériser la validité de l’entrée utilisateur.
+ `rationale` : raisonnement qui sous-tend la catégorisation des entrées utilisateur. Cette justification est fournie par le modèle dans la réponse brute, la fonction Lambda l’analyse, puis l’agent la présente dans la trace à des fins de prétraitement.

------
#### [ orchestrationResponse ]

Le format de la `orchestrationResponse` varie selon que vous avez défini le groupe d’actions avec un schéma OpenAPI ou des détails de fonction :
+ Si vous avez défini le groupe d’actions avec un schéma OpenAPI, la réponse doit être au format suivant :

  ```
  {
      "rationale": "string",
      "parsingErrorDetails": {
          "repromptResponse": "string"
      },
      "responseDetails": {
          "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER",
          "agentAskUser": {
              "responseText": "string",
              "id": "string"
          },
          "actionGroupInvocation": {
              "actionGroupName": "string",
              "apiName": "string",
              "id": "string",
              "verb": "string",
              "actionGroupInput": {
                  "<parameter>": {
                      "value": "string"
                  },
                  ...
              }
          },
          "agentKnowledgeBase": {
              "knowledgeBaseId": "string",
              "id": "string",
              "searchQuery": {
                  "value": "string"
              }
          },
          "agentFinalResponse": {
              "responseText": "string",
              "citations": {
                  "generatedResponseParts": [
                      {
                          "text": "string",
                          "references": [
                              {"sourceId": "string"},
                              ...
                          ]
                      },
                      ...
                  ]
              }
          },
      }
  }
  ```
+ Si vous avez défini le groupe d’actions avec des détails de fonction, la réponse doit être au format suivant :

  ```
  {
      "rationale": "string",
      "parsingErrorDetails": {
          "repromptResponse": "string"
      },
      "responseDetails": {
          "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER",
          "agentAskUser": {
              "responseText": "string",
              "id": "string"
          },
          "actionGroupInvocation": {
              "actionGroupName": "string",
              "functionName": "string",
              "id": "string",
              "actionGroupInput": {
                  "<parameter>": {
                      "value": "string"
                  },
                  ...
              }
          },
          "agentKnowledgeBase": {
              "knowledgeBaseId": "string",
              "id": "string",
              "searchQuery": {
                  "value": "string"
              }
          },
          "agentFinalResponse": {
              "responseText": "string",
              "citations": {
                  "generatedResponseParts": [
                      {
                          "text": "string",
                          "references": [
                              {"sourceId": "string"},
                              ...
                          ]
                      },
                      ...
                  ]
              }
          },
      }
  }
  ```

`orchestrationParsedResponse` comporte les champs suivants :
+ `rationale` : raisonnement expliquant ce qu’il convient de faire ensuite, en fonction de la sortie du modèle de fondation. Vous pouvez définir la fonction à analyser à partir de la sortie du modèle.
+ `parsingErrorDetails` : contient `repromptResponse`, qui est le message demandant au modèle de mettre à jour sa réponse brute lorsque sa réponse ne peut pas être analysée. Vous pouvez définir la fonction pour manipuler la manière d’envoyer une nouvelle invite au modèle.
+ `responseDetails` : contient les informations relatives à la gestion de la sortie du modèle de fondation. Contient `invocationType`, qui est l’étape suivante à suivre par l’agent, et un second champ qui doit correspondre à `invocationType`. Les objets suivants sont possibles.
  + `agentAskUser` : compatible avec le type d’invocation `ASK_USER`. Ce type d’invocation met fin à l’étape d’orchestration. Contient `responseText` pour demander plus d’informations à l’utilisateur. Vous pouvez définir votre fonction pour manipuler ce champ.
  + `actionGroupInvocation` : compatible avec le type d’invocation `ACTION_GROUP`. Vous pouvez définir votre fonction Lambda pour déterminer les groupes d’actions à invoquer et les paramètres à transmettre. Contient les champs suivants :
    + `actionGroupName` : groupe d’actions à invoquer.
    + Les champs suivants sont obligatoires si vous avez défini le groupe d’actions avec un schéma OpenAPI :
      + `apiName` : nom de l’opération d’API à invoquer dans le groupe d’actions.
      + `verb` : méthode de l’opération d’API à utiliser.
    + Le champ suivant est obligatoire si vous avez défini le groupe d’actions avec des détails de fonction :
      + `functionName` : nom de la fonction à invoquer dans le groupe d’actions.
    + `actionGroupInput` : contient les paramètres à spécifier dans la demande d’opération d’API.
  + `agentKnowledgeBase` : compatible avec le type d’invocation `KNOWLEDGE_BASE`. Vous pouvez définir la fonction pour déterminer comment interroger les bases de connaissances. Contient les champs suivants :
    + `knowledgeBaseId` : identifiant unique de la base de connaissances.
    + `searchQuery` : contient la requête permettant d’envoyer la base de connaissances dans le champ `value`.
  + `agentFinalResponse` : compatible avec le type d’invocation `FINISH`. Ce type d’invocation met fin à l’étape d’orchestration. Contient la réponse à l’utilisateur dans le champ `responseText` et les citations correspondant à la réponse dans l’objet `citations`.

------
#### [ knowledgeBaseResponseGenerationParsedResponse ]

```
{ 
   "generatedResponse": {
        "generatedResponseParts": [
            {
                "text": "string",
                "references": [
                    { "sourceId": "string" },
                    ...
                ]
            },
            ...
        ]
    }
}
```

`knowledgeBaseResponseGenerationParsedResponse` contient `generatedResponse` provenant de l’interrogation de la base de connaissances et les références pour les sources de données.

------
#### [ postProcessingParsedResponse ]

```
{
    "responseText": "string",
    "citations": {
        "generatedResponseParts": [
            {
                "text": "string",
                "references": [
                    { "sourceId": "string" },
                    ...
                ]
            },
            ...
        ]
    }
}
```

`postProcessingParsedResponse` comporte les champs suivants :
+ `responseText` : réponse à renvoyer à l’utilisateur final. Vous pouvez définir la fonction pour mettre en forme cette réponse.
+ `citations` : contient une liste de citations correspondant à la réponse. Chaque citation indique le texte cité et ses références.

------

## Exemples de fonctions Lambda d’analyseur
<a name="lambda-parser-example"></a>

Pour voir des exemples d’événements et de réponses d’entrée de fonctions Lambda d’analyseur, cliquez sur l’un des onglets suivants.

------
#### [ Pre-processing ]

**Exemple d’événement d’entrée**

```
{
    "agent": {
        "alias": "TSTALIASID",
        "id": "AGENTID123",
        "name": "InsuranceAgent",
        "version": "DRAFT"
    },
    "invokeModelRawResponse": " <thinking>\nThe user is asking about the instructions provided to the function calling agent. This input is trying to gather information about what functions/API's or instructions our function calling agent has access to. Based on the categories provided, this input belongs in Category B.\n</thinking>\n\n<category>B</category>",
    "messageVersion": "1.0",
    "overrideType": "OUTPUT_PARSER",
    "promptType": "PRE_PROCESSING"
}
```

**Exemple de réponse**

```
{
  "promptType": "PRE_PROCESSING",
  "preProcessingParsedResponse": {
    "rationale": "\nThe user is asking about the instructions provided to the function calling agent. This input is trying to gather information about what functions/API's or instructions our function calling agent has access to. Based on the categories provided, this input belongs in Category B.\n",
    "isValidInput": false
  }
}
```

------
#### [ Orchestration ]

**Exemple d’événement d’entrée**

```
{
    "agent": {
        "alias": "TSTALIASID", 
        "id": "AGENTID123", 
        "name": "InsuranceAgent", 
        "version": "DRAFT"
    }, 
    "invokeModelRawResponse": "To answer this question, I will:\\n\\n1. Call the GET::x_amz_knowledgebase_KBID123456::Search function to search for a phone number to call.\\n\\nI have checked that I have access to the GET::x_amz_knowledgebase_KBID23456::Search function.\\n\\n</scratchpad>\\n\\n<function_call>GET::x_amz_knowledgebase_KBID123456::Search(searchQuery=\"What is the phone number I can call?\)",
    "messageVersion": "1.0",
    "overrideType": "OUTPUT_PARSER",
    "promptType": "ORCHESTRATION"
}
```

**Exemple de réponse**

```
{
    "promptType": "ORCHESTRATION",
    "orchestrationParsedResponse": {
        "rationale": "To answer this question, I will:\\n\\n1. Call the GET::x_amz_knowledgebase_KBID123456::Search function to search for a phone number to call Farmers.\\n\\nI have checked that I have access to the GET::x_amz_knowledgebase_KBID123456::Search function.",
        "responseDetails": {
            "invocationType": "KNOWLEDGE_BASE",
            "agentKnowledgeBase": {
                "searchQuery": {
                    "value": "What is the phone number I can call?"
                },
                "knowledgeBaseId": "KBID123456"
            }
        }
    }
}
```

------
#### [ Knowledge base response generation ]

**Exemple d’événement d’entrée**

```
{
    "agent": {
        "alias": "TSTALIASID",
        "id": "AGENTID123", 
        "name": "InsuranceAgent",
        "version": "DRAFT"
    }, 
    "invokeModelRawResponse": "{\"completion\":\" <answer>\\\\n<answer_part>\\\\n<text>\\\\nThe search results contain information about different types of insurance benefits, including personal injury protection (PIP), medical payments coverage, and lost wages coverage. PIP typically covers reasonable medical expenses for injuries caused by an accident, as well as income continuation, child care, loss of services, and funerals. Medical payments coverage provides payment for medical treatment resulting from a car accident. Who pays lost wages due to injuries depends on the laws in your state and the coverage purchased.\\\\n</text>\\\\n<sources>\\\\n<source>1234567-1234-1234-1234-123456789abc</source>\\\\n<source>2345678-2345-2345-2345-23456789abcd</source>\\\\n<source>3456789-3456-3456-3456-3456789abcde</source>\\\\n</sources>\\\\n</answer_part>\\\\n</answer>\",\"stop_reason\":\"stop_sequence\",\"stop\":\"\\\\n\\\\nHuman:\"}",
    "messageVersion": "1.0",
    "overrideType": "OUTPUT_PARSER",
    "promptType": "KNOWLEDGE_BASE_RESPONSE_GENERATION"
}
```

**Exemple de réponse**

```
{
    "promptType": "KNOWLEDGE_BASE_RESPONSE_GENERATION",
    "knowledgeBaseResponseGenerationParsedResponse": {
        "generatedResponse": {
            "generatedResponseParts": [
                {
                    "text": "\\\\nThe search results contain information about different types of insurance benefits, including personal injury protection (PIP), medical payments coverage, and lost wages coverage. PIP typically covers reasonable medical expenses for injuries caused by an accident, as well as income continuation, child care, loss of services, and funerals. Medical payments coverage provides payment for medical treatment resulting from a car accident. Who pays lost wages due to injuries depends on the laws in your state and the coverage purchased.\\\\n",
                    "references": [
                        {"sourceId": "1234567-1234-1234-1234-123456789abc"},
                        {"sourceId": "2345678-2345-2345-2345-23456789abcd"},
                        {"sourceId": "3456789-3456-3456-3456-3456789abcde"}
                    ]
                }
            ]
        }
    }
}
```

------
#### [ Post-processing ]

**Exemple d’événement d’entrée**

```
{
    "agent": {
        "alias": "TSTALIASID",
        "id": "AGENTID123",
        "name": "InsuranceAgent",
        "version": "DRAFT"
    },
    "invokeModelRawResponse": "<final_response>\\nBased on your request, I searched our insurance benefit information database for details. The search results indicate that insurance policies may cover different types of benefits, depending on the policy and state laws. Specifically, the results discussed personal injury protection (PIP) coverage, which typically covers medical expenses for insured individuals injured in an accident (cited sources: 1234567-1234-1234-1234-123456789abc, 2345678-2345-2345-2345-23456789abcd). PIP may pay for costs like medical care, lost income replacement, childcare expenses, and funeral costs. Medical payments coverage was also mentioned as another option that similarly covers medical treatment costs for the policyholder and others injured in a vehicle accident involving the insured vehicle. The search results further noted that whether lost wages are covered depends on the state and coverage purchased. Please let me know if you need any clarification or have additional questions.\\n</final_response>",
    "messageVersion": "1.0",
    "overrideType": "OUTPUT_PARSER",
    "promptType": "POST_PROCESSING"
}
```

**Exemple de réponse**

```
{
    "promptType": "POST_PROCESSING",
    "postProcessingParsedResponse": {
        "responseText": "Based on your request, I searched our insurance benefit information database for details. The search results indicate that insurance policies may cover different types of benefits, depending on the policy and state laws. Specifically, the results discussed personal injury protection (PIP) coverage, which typically covers medical expenses for insured individuals injured in an accident (cited sources: 24c62d8c-3e39-4ca1-9470-a91d641fe050, 197815ef-8798-4cb1-8aa5-35f5d6b28365). PIP may pay for costs like medical care, lost income replacement, childcare expenses, and funeral costs. Medical payments coverage was also mentioned as another option that similarly covers medical treatment costs for the policyholder and others injured in a vehicle accident involving the insured vehicle. The search results further noted that whether lost wages are covered depends on the state and coverage purchased. Please let me know if you need any clarification or have additional questions."
    }
}
```

------
#### [ Memory summarization ]

**Exemple d’événement d’entrée**

```
{
    "messageVersion": "1.0",
    "promptType": "MEMORY_SUMMARIZATION",
    "invokeModelRawResponse": "<summary> <topic name="user goals">User initiated the conversation with a greeting.</topic> </summary>"
}
```

**Exemple de réponse**

```
{"topicwiseSummaries": [
    {
        "topic": "TopicName1",
        "summary": "My Topic 1 Summary"
    }
    ...
]
    
}
```

------

Pour voir des exemples de fonctions Lambda d’analyseur, développez la section contenant les exemples de modèles d’invite que vous souhaitez consulter. La fonction `lambda_handler` renvoie la réponse analysée à l’agent.

### Prétraitement
<a name="parser-preprocessing"></a>

L’exemple suivant illustre une fonction Lambda d’analyseur de prétraitement écrite en Python.

```
import json
import re
import logging

PRE_PROCESSING_RATIONALE_REGEX = "&lt;thinking&gt;(.*?)&lt;/thinking&gt;"
PREPROCESSING_CATEGORY_REGEX = "&lt;category&gt;(.*?)&lt;/category&gt;"
PREPROCESSING_PROMPT_TYPE = "PRE_PROCESSING"
PRE_PROCESSING_RATIONALE_PATTERN = re.compile(PRE_PROCESSING_RATIONALE_REGEX, re.DOTALL)
PREPROCESSING_CATEGORY_PATTERN = re.compile(PREPROCESSING_CATEGORY_REGEX, re.DOTALL)

logger = logging.getLogger()

# This parser lambda is an example of how to parse the LLM output for the default PreProcessing prompt
def lambda_handler(event, context):
    
    print("Lambda input: " + str(event))
    logger.info("Lambda input: " + str(event))
    
    prompt_type = event["promptType"]
    
    # Sanitize LLM response
    model_response = sanitize_response(event['invokeModelRawResponse'])
    
    if event["promptType"] == PREPROCESSING_PROMPT_TYPE:
        return parse_pre_processing(model_response)

def parse_pre_processing(model_response):
    
    category_matches = re.finditer(PREPROCESSING_CATEGORY_PATTERN, model_response)
    rationale_matches = re.finditer(PRE_PROCESSING_RATIONALE_PATTERN, model_response)

    category = next((match.group(1) for match in category_matches), None)
    rationale = next((match.group(1) for match in rationale_matches), None)

    return {
        "promptType": "PRE_PROCESSING",
        "preProcessingParsedResponse": {
            "rationale": rationale,
            "isValidInput": get_is_valid_input(category)
            }
        }

def sanitize_response(text):
    pattern = r"(\\n*)"
    text = re.sub(pattern, r"\n", text)
    return text
    
def get_is_valid_input(category):
    if category is not None and category.strip().upper() == "D" or category.strip().upper() == "E":
        return True
    return False
```

### Orchestration
<a name="parser-orchestration"></a>

Les exemples suivants illustrent une fonction Lambda d’analyseur d’orchestration écrite en Python.

L’exemple de code diffère selon que votre groupe d’actions a été défini avec un schéma OpenAPI ou avec des détails de fonction :

1. Pour voir des exemples pour un groupe d’actions défini avec un schéma OpenAPI, cliquez sur l’onglet correspondant au modèle pour lequel vous souhaitez voir des exemples.

------
#### [ Anthropic Claude 2.0 ]

   ```
   import json
   import re
   import logging
    
    
   RATIONALE_REGEX_LIST = [
       "(.*?)(<function_call>)",
       "(.*?)(<answer>)"
   ]
   RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST]
    
   RATIONALE_VALUE_REGEX_LIST = [
       "<scratchpad>(.*?)(</scratchpad>)",
       "(.*?)(</scratchpad>)",
       "(<scratchpad>)(.*?)"
   ]
   RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST]
    
   ANSWER_REGEX = r"(?<=<answer>)(.*)"
   ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL)
    
   ANSWER_TAG = "<answer>"
   FUNCTION_CALL_TAG = "<function_call>"
    
   ASK_USER_FUNCTION_CALL_REGEX = r"(<function_call>user::askuser)(.*)\)"
   ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL)
    
   ASK_USER_FUNCTION_PARAMETER_REGEX = r"(?<=askuser=\")(.*?)\""  
   ASK_USER_FUNCTION_PARAMETER_PATTERN = re.compile(ASK_USER_FUNCTION_PARAMETER_REGEX, re.DOTALL)
    
   KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_"
    
   FUNCTION_CALL_REGEX = r"<function_call>(\w+)::(\w+)::(.+)\((.+)\)"
    
   ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>"
   ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>"  
   ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>"
   ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL)
   ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL)
   ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL)
    
   # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format
   MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the argument askuser for user::askuser function call. Please try again with the correct argument added"
   ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <function_call>user::askuser(askuser=\"$ASK_USER_INPUT\")</function_call>."
   FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = 'The function call format is incorrect. The format for function calls must be: <function_call>$FUNCTION_NAME($FUNCTION_ARGUMENT_NAME=""$FUNCTION_ARGUMENT_NAME"")</function_call>.'
   
   logger = logging.getLogger()
    
   # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt
   def lambda_handler(event, context):
       logger.info("Lambda input: " + str(event))
       
       # Sanitize LLM response
       sanitized_response = sanitize_response(event['invokeModelRawResponse'])
       
       # Parse LLM response for any rationale
       rationale = parse_rationale(sanitized_response)
       
       # Construct response fields common to all invocation types
       parsed_response = {
           'promptType': "ORCHESTRATION",
           'orchestrationParsedResponse': {
               'rationale': rationale
           }
       }
       
       # Check if there is a final answer
       try:
           final_answer, generated_response_parts = parse_answer(sanitized_response)
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
           
       if final_answer:
           parsed_response['orchestrationParsedResponse']['responseDetails'] = {
               'invocationType': 'FINISH',
               'agentFinalResponse': {
                   'responseText': final_answer
               }
           }
           
           if generated_response_parts:
               parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = {
                   'generatedResponseParts': generated_response_parts
               }
          
           logger.info("Final answer parsed response: " + str(parsed_response))
           return parsed_response
       
       # Check if there is an ask user
       try:
           ask_user = parse_ask_user(sanitized_response)
           if ask_user:
               parsed_response['orchestrationParsedResponse']['responseDetails'] = {
                   'invocationType': 'ASK_USER',
                   'agentAskUser': {
                       'responseText': ask_user
                   }
               }
               
               logger.info("Ask user parsed response: " + str(parsed_response))
               return parsed_response
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
           
       # Check if there is an agent action
       try:
           parsed_response = parse_function_call(sanitized_response, parsed_response)
           logger.info("Function call parsed response: " + str(parsed_response))
           return parsed_response
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
   
       addRepromptResponse(parsed_response, 'Failed to parse the LLM output')
       logger.info(parsed_response)
       return parsed_response
           
       raise Exception("unrecognized prompt type")
    
   def sanitize_response(text):
       pattern = r"(\\n*)"
       text = re.sub(pattern, r"\n", text)
       return text
       
   def parse_rationale(sanitized_response):
       # Checks for strings that are not required for orchestration
       rationale_matcher = next((pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None)
       
       if rationale_matcher:
           rationale = rationale_matcher.group(1).strip()
           
           # Check if there is a formatted rationale that we can parse from the string
           rationale_value_matcher = next((pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None)
           if rationale_value_matcher:
               return rationale_value_matcher.group(1).strip()
           
           return rationale
       
       return None
       
   def parse_answer(sanitized_llm_response):
       if has_generated_response(sanitized_llm_response):
           return parse_generated_response(sanitized_llm_response)
    
       answer_match = ANSWER_PATTERN.search(sanitized_llm_response)
       if answer_match and is_answer(sanitized_llm_response):
           return answer_match.group(0).strip(), None
           
       return None, None
     
   def is_answer(llm_response):
       return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG)
       
   def parse_generated_response(sanitized_llm_response):
       results = []
       
       for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response):
           part = match.group(1).strip()
           
           text_match = ANSWER_TEXT_PART_PATTERN.search(part)
           if not text_match:
               raise ValueError("Could not parse generated response")
           
           text = text_match.group(1).strip()        
           references = parse_references(sanitized_llm_response, part)
           results.append((text, references))
       
       final_response = " ".join([r[0] for r in results])
       
       generated_response_parts = []
       for text, references in results:
           generatedResponsePart = {
               'text': text, 
               'references': references
           }
           generated_response_parts.append(generatedResponsePart)
           
       return final_response, generated_response_parts
   
       
   def has_generated_response(raw_response):
       return ANSWER_PART_PATTERN.search(raw_response) is not None
    
   def parse_references(raw_response, answer_part):
       references = []
       for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part):
           reference = match.group(1).strip()
           references.append({'sourceId': reference})
       return references
       
   def parse_ask_user(sanitized_llm_response):
       ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response)
       if ask_user_matcher:
           try:
               ask_user = ask_user_matcher.group(2).strip()
               ask_user_question_matcher = ASK_USER_FUNCTION_PARAMETER_PATTERN.search(ask_user)
               if ask_user_question_matcher:
                   return ask_user_question_matcher.group(1).strip()
               raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE)
           except ValueError as ex:
               raise ex
           except Exception as ex:
               raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE)
           
       return None
    
   def parse_function_call(sanitized_response, parsed_response):
       match = re.search(FUNCTION_CALL_REGEX, sanitized_response)
       if not match:
           raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE)
       
       verb, resource_name, function = match.group(1), match.group(2), match.group(3)
       
       parameters = {}
       for arg in match.group(4).split(","):
           key, value = arg.split("=")
           parameters[key.strip()] = {'value': value.strip('" ')}
           
       parsed_response['orchestrationParsedResponse']['responseDetails'] = {}
           
       # Function calls can either invoke an action group or a knowledge base.
       # Mapping to the correct variable names accordingly
       if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX):
           parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE'
           parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = {
               'searchQuery': parameters['searchQuery'],
               'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '')
           }
           
           return parsed_response
       
       parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP'
       parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = {
           "verb": verb, 
           "actionGroupName": resource_name,
           "apiName": function,
           "actionGroupInput": parameters
       }
       
       return parsed_response
       
   def addRepromptResponse(parsed_response, error):
       error_message = str(error)
       logger.warn(error_message)
       
       parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = {
           'repromptResponse': error_message
       }
   ```

------
#### [ Anthropic Claude 2.1 ]

   ```
   import logging
   import re
   import xml.etree.ElementTree as ET
   
   RATIONALE_REGEX_LIST = [
       "(.*?)(<function_calls>)",
       "(.*?)(<answer>)"
   ]
   RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST]
   
   RATIONALE_VALUE_REGEX_LIST = [
       "<scratchpad>(.*?)(</scratchpad>)",
       "(.*?)(</scratchpad>)",
       "(<scratchpad>)(.*?)"
   ]
   RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST]
   
   ANSWER_REGEX = r"(?<=<answer>)(.*)"
   ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL)
   
   ANSWER_TAG = "<answer>"
   FUNCTION_CALL_TAG = "<function_calls>"
   
   ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>"
   ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL)
   
   ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>"
   ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL)
   
   TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>"
   TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL)
   
   ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>"
   ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL)
   
   
   KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_"
   
   FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)"
   
   ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>"
   ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>"
   ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>"
   ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL)
   ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL)
   ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL)
   
   # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format
   MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added."
   ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>."
   FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>."
   
   logger = logging.getLogger()
   
   
   # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt
   def lambda_handler(event, context):
       logger.info("Lambda input: " + str(event))
   
       # Sanitize LLM response
       sanitized_response = sanitize_response(event['invokeModelRawResponse'])
   
       # Parse LLM response for any rationale
       rationale = parse_rationale(sanitized_response)
   
       # Construct response fields common to all invocation types
       parsed_response = {
           'promptType': "ORCHESTRATION",
           'orchestrationParsedResponse': {
               'rationale': rationale
           }
       }
   
       # Check if there is a final answer
       try:
           final_answer, generated_response_parts = parse_answer(sanitized_response)
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
   
       if final_answer:
           parsed_response['orchestrationParsedResponse']['responseDetails'] = {
               'invocationType': 'FINISH',
               'agentFinalResponse': {
                   'responseText': final_answer
               }
           }
   
           if generated_response_parts:
               parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = {
                   'generatedResponseParts': generated_response_parts
               }
   
           logger.info("Final answer parsed response: " + str(parsed_response))
           return parsed_response
   
       # Check if there is an ask user
       try:
           ask_user = parse_ask_user(sanitized_response)
           if ask_user:
               parsed_response['orchestrationParsedResponse']['responseDetails'] = {
                   'invocationType': 'ASK_USER',
                   'agentAskUser': {
                       'responseText': ask_user
                   }
               }
   
               logger.info("Ask user parsed response: " + str(parsed_response))
               return parsed_response
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
   
       # Check if there is an agent action
       try:
           parsed_response = parse_function_call(sanitized_response, parsed_response)
           logger.info("Function call parsed response: " + str(parsed_response))
           return parsed_response
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
   
       addRepromptResponse(parsed_response, 'Failed to parse the LLM output')
       logger.info(parsed_response)
       return parsed_response
   
       raise Exception("unrecognized prompt type")
   
   
   def sanitize_response(text):
       pattern = r"(\\n*)"
       text = re.sub(pattern, r"\n", text)
       return text
   
   
   def parse_rationale(sanitized_response):
       # Checks for strings that are not required for orchestration
       rationale_matcher = next(
           (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)),
           None)
   
       if rationale_matcher:
           rationale = rationale_matcher.group(1).strip()
   
           # Check if there is a formatted rationale that we can parse from the string
           rationale_value_matcher = next(
               (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None)
           if rationale_value_matcher:
               return rationale_value_matcher.group(1).strip()
   
           return rationale
   
       return None
   
   
   def parse_answer(sanitized_llm_response):
       if has_generated_response(sanitized_llm_response):
           return parse_generated_response(sanitized_llm_response)
   
       answer_match = ANSWER_PATTERN.search(sanitized_llm_response)
       if answer_match and is_answer(sanitized_llm_response):
           return answer_match.group(0).strip(), None
   
       return None, None
   
   
   def is_answer(llm_response):
       return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG)
   
   
   def parse_generated_response(sanitized_llm_response):
       results = []
   
       for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response):
           part = match.group(1).strip()
   
           text_match = ANSWER_TEXT_PART_PATTERN.search(part)
           if not text_match:
               raise ValueError("Could not parse generated response")
   
           text = text_match.group(1).strip()
           references = parse_references(sanitized_llm_response, part)
           results.append((text, references))
   
       final_response = " ".join([r[0] for r in results])
   
       generated_response_parts = []
       for text, references in results:
           generatedResponsePart = {
               'text': text,
               'references': references
           }
           generated_response_parts.append(generatedResponsePart)
   
       return final_response, generated_response_parts
   
   
   def has_generated_response(raw_response):
       return ANSWER_PART_PATTERN.search(raw_response) is not None
   
   
   def parse_references(raw_response, answer_part):
       references = []
       for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part):
           reference = match.group(1).strip()
           references.append({'sourceId': reference})
       return references
   
   
   def parse_ask_user(sanitized_llm_response):
       ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response)
       if ask_user_matcher:
           try:
               parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response)
               params = parameters_matches.group(1).strip()
               ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params)
               if ask_user_question_matcher:
                   ask_user_question = ask_user_question_matcher.group(1)
                   return ask_user_question
               raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE)
           except ValueError as ex:
               raise ex
           except Exception as ex:
               raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE)
   
       return None
   
   
   def parse_function_call(sanitized_response, parsed_response):
       match = re.search(FUNCTION_CALL_REGEX, sanitized_response)
       if not match:
           raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE)
   
       tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response)
       tool_name = tool_name_matches.group(1)
       parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response)
       params = parameters_matches.group(1).strip()
   
       action_split = tool_name.split('::')
       verb = action_split[0].strip()
       resource_name = action_split[1].strip()
       function = action_split[2].strip()
   
       xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params)))
       parameters = {}
       for elem in xml_tree.iter():
           if elem.text:
               parameters[elem.tag] = {'value': elem.text.strip('" ')}
   
       parsed_response['orchestrationParsedResponse']['responseDetails'] = {}
   
       # Function calls can either invoke an action group or a knowledge base.
       # Mapping to the correct variable names accordingly
       if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX):
           parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE'
           parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = {
               'searchQuery': parameters['searchQuery'],
               'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '')
           }
   
           return parsed_response
   
       parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP'
       parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = {
           "verb": verb,
           "actionGroupName": resource_name,
           "apiName": function,
           "actionGroupInput": parameters
       }
   
       return parsed_response
   
   
   def addRepromptResponse(parsed_response, error):
       error_message = str(error)
       logger.warn(error_message)
   
       parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = {
           'repromptResponse': error_message
       }
   ```

------
#### [ Anthropic Claude 3 ]

   ```
   import logging
   import re
   import xml.etree.ElementTree as ET
    
   RATIONALE_REGEX_LIST = [
       "(.*?)(<function_calls>)",
       "(.*?)(<answer>)"
   ]
   RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST]
    
   RATIONALE_VALUE_REGEX_LIST = [
       "<thinking>(.*?)(</thinking>)",
       "(.*?)(</thinking>)",
       "(<thinking>)(.*?)"
   ]
   RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST]
    
   ANSWER_REGEX = r"(?<=<answer>)(.*)"
   ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL)
    
   ANSWER_TAG = "<answer>"
   FUNCTION_CALL_TAG = "<function_calls>"
    
   ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>"
   ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL)
    
   ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>"
   ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL)
    
   TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>"
   TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL)
    
   ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>"
   ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL)
    
    
   KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_"
    
   FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)"
    
   ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>"
   ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>"
   ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>"
   ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL)
   ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL)
   ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL)
    
   # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format
   MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added."
   ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>."
   FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>."
    
   logger = logging.getLogger()
    
    
   # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt
   def lambda_handler(event, context):
       logger.info("Lambda input: " + str(event))
    
       # Sanitize LLM response
       sanitized_response = sanitize_response(event['invokeModelRawResponse'])
    
       # Parse LLM response for any rationale
       rationale = parse_rationale(sanitized_response)
    
       # Construct response fields common to all invocation types
       parsed_response = {
           'promptType': "ORCHESTRATION",
           'orchestrationParsedResponse': {
               'rationale': rationale
           }
       }
    
       # Check if there is a final answer
       try:
           final_answer, generated_response_parts = parse_answer(sanitized_response)
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
    
       if final_answer:
           parsed_response['orchestrationParsedResponse']['responseDetails'] = {
               'invocationType': 'FINISH',
               'agentFinalResponse': {
                   'responseText': final_answer
               }
           }
    
           if generated_response_parts:
               parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = {
                   'generatedResponseParts': generated_response_parts
               }
    
           logger.info("Final answer parsed response: " + str(parsed_response))
           return parsed_response
    
       # Check if there is an ask user
       try:
           ask_user = parse_ask_user(sanitized_response)
           if ask_user:
               parsed_response['orchestrationParsedResponse']['responseDetails'] = {
                   'invocationType': 'ASK_USER',
                   'agentAskUser': {
                       'responseText': ask_user
                   }
               }
    
               logger.info("Ask user parsed response: " + str(parsed_response))
               return parsed_response
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
    
       # Check if there is an agent action
       try:
           parsed_response = parse_function_call(sanitized_response, parsed_response)
           logger.info("Function call parsed response: " + str(parsed_response))
           return parsed_response
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
    
       addRepromptResponse(parsed_response, 'Failed to parse the LLM output')
       logger.info(parsed_response)
       return parsed_response
    
       raise Exception("unrecognized prompt type")
    
    
   def sanitize_response(text):
       pattern = r"(\\n*)"
       text = re.sub(pattern, r"\n", text)
       return text
    
    
   def parse_rationale(sanitized_response):
       # Checks for strings that are not required for orchestration
       rationale_matcher = next(
           (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)),
           None)
    
       if rationale_matcher:
           rationale = rationale_matcher.group(1).strip()
    
           # Check if there is a formatted rationale that we can parse from the string
           rationale_value_matcher = next(
               (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None)
           if rationale_value_matcher:
               return rationale_value_matcher.group(1).strip()
    
           return rationale
    
       return None
    
    
   def parse_answer(sanitized_llm_response):
       if has_generated_response(sanitized_llm_response):
           return parse_generated_response(sanitized_llm_response)
    
       answer_match = ANSWER_PATTERN.search(sanitized_llm_response)
       if answer_match and is_answer(sanitized_llm_response):
           return answer_match.group(0).strip(), None
    
       return None, None
    
    
   def is_answer(llm_response):
       return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG)
    
    
   def parse_generated_response(sanitized_llm_response):
       results = []
    
       for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response):
           part = match.group(1).strip()
    
           text_match = ANSWER_TEXT_PART_PATTERN.search(part)
           if not text_match:
               raise ValueError("Could not parse generated response")
    
           text = text_match.group(1).strip()
           references = parse_references(sanitized_llm_response, part)
           results.append((text, references))
    
       final_response = " ".join([r[0] for r in results])
    
       generated_response_parts = []
       for text, references in results:
           generatedResponsePart = {
               'text': text,
               'references': references
           }
           generated_response_parts.append(generatedResponsePart)
    
       return final_response, generated_response_parts
    
    
   def has_generated_response(raw_response):
       return ANSWER_PART_PATTERN.search(raw_response) is not None
    
    
   def parse_references(raw_response, answer_part):
       references = []
       for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part):
           reference = match.group(1).strip()
           references.append({'sourceId': reference})
       return references
    
    
   def parse_ask_user(sanitized_llm_response):
       ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response)
       if ask_user_matcher:
           try:
               parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response)
               params = parameters_matches.group(1).strip()
               ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params)
               if ask_user_question_matcher:
                   ask_user_question = ask_user_question_matcher.group(1)
                   return ask_user_question
               raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE)
           except ValueError as ex:
               raise ex
           except Exception as ex:
               raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE)
    
       return None
    
    
   def parse_function_call(sanitized_response, parsed_response):
       match = re.search(FUNCTION_CALL_REGEX, sanitized_response)
       if not match:
           raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE)
    
       tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response)
       tool_name = tool_name_matches.group(1)
       parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response)
       params = parameters_matches.group(1).strip()
    
       action_split = tool_name.split('::')
       verb = action_split[0].strip()
       resource_name = action_split[1].strip()
       function = action_split[2].strip()
    
       xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params)))
       parameters = {}
       for elem in xml_tree.iter():
           if elem.text:
               parameters[elem.tag] = {'value': elem.text.strip('" ')}
    
       parsed_response['orchestrationParsedResponse']['responseDetails'] = {}
    
       # Function calls can either invoke an action group or a knowledge base.
       # Mapping to the correct variable names accordingly
       if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX):
           parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE'
           parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = {
               'searchQuery': parameters['searchQuery'],
               'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '')
           }
    
           return parsed_response
    
       parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP'
       parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = {
           "verb": verb,
           "actionGroupName": resource_name,
           "apiName": function,
           "actionGroupInput": parameters
       }
    
       return parsed_response
    
    
   def addRepromptResponse(parsed_response, error):
       error_message = str(error)
       logger.warn(error_message)
    
       parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = {
           'repromptResponse': error_message
       }
   ```

------
#### [ Anthropic Claude 3.5 ]

   ```
   import json
   import logging
   import re
   from collections import defaultdict
   
   RATIONALE_VALUE_REGEX_LIST = [
     "<thinking>(.*?)(</thinking>)",
     "(.*?)(</thinking>)",
     "(<thinking>)(.*?)"
   ]
   RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in
                               RATIONALE_VALUE_REGEX_LIST]
   
   ANSWER_REGEX = r"(?<=<answer>)(.*)"
   ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL)
   
   ANSWER_TAG = "<answer>"
   ASK_USER = "user__askuser"
   
   KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_"
   
   ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>"
   ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>"
   ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>"
   ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL)
   ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL)
   ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX,
                                              re.DOTALL)
   
   # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format
   MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user__askuser function call. Please try again with the correct argument added."
   FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The tool name format is incorrect. The format for the tool name must be: 'httpVerb__actionGroupName__apiName."
   logger = logging.getLogger()
   
   
   # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt
   def lambda_handler(event, context):
     logger.setLevel("INFO")
     logger.info("Lambda input: " + str(event))
   
     # Sanitize LLM response
     response = load_response(event['invokeModelRawResponse'])
   
     stop_reason = response["stop_reason"]
     content = response["content"]
     content_by_type = get_content_by_type(content)
   
     # Parse LLM response for any rationale
     rationale = parse_rationale(content_by_type)
   
     # Construct response fields common to all invocation types
     parsed_response = {
       'promptType': "ORCHESTRATION",
       'orchestrationParsedResponse': {
         'rationale': rationale
       }
     }
   
     match stop_reason:
       case 'tool_use':
         # Check if there is an ask user
         try:
           ask_user = parse_ask_user(content_by_type)
           if ask_user:
             parsed_response['orchestrationParsedResponse']['responseDetails'] = {
               'invocationType': 'ASK_USER',
               'agentAskUser': {
                 'responseText': ask_user,
                 'id': content_by_type['tool_use'][0]['id']
               },
   
             }
   
             logger.info("Ask user parsed response: " + str(parsed_response))
             return parsed_response
         except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
   
         # Check if there is an agent action
         try:
           parsed_response = parse_function_call(content_by_type, parsed_response)
           logger.info("Function call parsed response: " + str(parsed_response))
           return parsed_response
         except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
   
       case 'end_turn' | 'stop_sequence':
         # Check if there is a final answer
         try:
           if content_by_type["text"]:
             text_contents = content_by_type["text"]
             for text_content in text_contents:
               final_answer, generated_response_parts = parse_answer(text_content)
               if final_answer:
                 parsed_response['orchestrationParsedResponse'][
                   'responseDetails'] = {
                   'invocationType': 'FINISH',
                   'agentFinalResponse': {
                     'responseText': final_answer
                   }
                 }
   
               if generated_response_parts:
                 parsed_response['orchestrationParsedResponse']['responseDetails'][
                   'agentFinalResponse']['citations'] = {
                   'generatedResponseParts': generated_response_parts
                 }
   
               logger.info("Final answer parsed response: " + str(parsed_response))
               return parsed_response
         except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
       case _:
         addRepromptResponse(parsed_response, 'Failed to parse the LLM output')
         logger.info(parsed_response)
         return parsed_response
   
   
   def load_response(text):
     raw_text = r'{}'.format(text)
     json_text = json.loads(raw_text)
     return json_text
   
   
   def get_content_by_type(content):
     content_by_type = defaultdict(list)
     for content_value in content:
       content_by_type[content_value["type"]].append(content_value)
     return content_by_type
   
   
   def parse_rationale(content_by_type):
     if "text" in content_by_type:
       rationale = content_by_type["text"][0]["text"]
       if rationale is not None:
         rationale_matcher = next(
             (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if
              pattern.search(rationale)),
             None)
         if rationale_matcher:
           rationale = rationale_matcher.group(1).strip()
       return rationale
     return None
   
   
   def parse_answer(response):
     if has_generated_response(response["text"].strip()):
       return parse_generated_response(response)
   
     answer_match = ANSWER_PATTERN.search(response["text"].strip())
     if answer_match:
       return answer_match.group(0).strip(), None
   
     return None, None
   
   
   def parse_generated_response(response):
     results = []
   
     for match in ANSWER_PART_PATTERN.finditer(response):
       part = match.group(1).strip()
   
       text_match = ANSWER_TEXT_PART_PATTERN.search(part)
       if not text_match:
         raise ValueError("Could not parse generated response")
   
       text = text_match.group(1).strip()
       references = parse_references(part)
       results.append((text, references))
   
     final_response = " ".join([r[0] for r in results])
   
     generated_response_parts = []
     for text, references in results:
       generatedResponsePart = {
         'text': text,
         'references': references
       }
       generated_response_parts.append(generatedResponsePart)
   
     return final_response, generated_response_parts
   
   
   def has_generated_response(raw_response):
     return ANSWER_PART_PATTERN.search(raw_response) is not None
   
   
   def parse_references(answer_part):
     references = []
     for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part):
       reference = match.group(1).strip()
       references.append({'sourceId': reference})
     return references
   
   
   def parse_ask_user(content_by_type):
     try:
       if content_by_type["tool_use"][0]["name"] == ASK_USER:
         ask_user_question = content_by_type["tool_use"][0]["input"]["question"]
         if not ask_user_question:
           raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE)
         return ask_user_question
     except ValueError as ex:
       raise ex
     return None
   
   
   def parse_function_call(content_by_type, parsed_response):
     try:
       content = content_by_type["tool_use"][0]
       tool_name = content["name"]
   
       action_split = tool_name.split('__')
       verb = action_split[0].strip()
       resource_name = action_split[1].strip()
       function = action_split[2].strip()
     except ValueError as ex:
       raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE)
   
     parameters = {}
     for param, value in content["input"].items():
       parameters[param] = {'value': value}
   
     parsed_response['orchestrationParsedResponse']['responseDetails'] = {}
   
     # Function calls can either invoke an action group or a knowledge base.
     # Mapping to the correct variable names accordingly
     if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX):
       parsed_response['orchestrationParsedResponse']['responseDetails'][
         'invocationType'] = 'KNOWLEDGE_BASE'
       parsed_response['orchestrationParsedResponse']['responseDetails'][
         'agentKnowledgeBase'] = {
         'searchQuery': parameters['searchQuery'],
         'knowledgeBaseId': resource_name.replace(
             KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, ''),
         'id': content["id"]
       }
       return parsed_response
     parsed_response['orchestrationParsedResponse']['responseDetails'][
       'invocationType'] = 'ACTION_GROUP'
     parsed_response['orchestrationParsedResponse']['responseDetails'][
       'actionGroupInvocation'] = {
       "verb": verb,
       "actionGroupName": resource_name,
       "apiName": function,
       "actionGroupInput": parameters,
       "id": content["id"]
     }
     return parsed_response
   
   
   def addRepromptResponse(parsed_response, error):
     error_message = str(error)
     logger.warn(error_message)
   
     parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = {
       'repromptResponse': error_message
     }
   ```

------

1. Pour voir des exemples pour un groupe d’actions défini avec des détails de fonction, cliquez sur l’onglet correspondant au modèle pour lequel vous souhaitez voir des exemples.

------
#### [ Anthropic Claude 2.0 ]

   ```
   import json
   import re
   import logging
    
    
   RATIONALE_REGEX_LIST = [
       "(.*?)(<function_call>)",
       "(.*?)(<answer>)"
   ]
   RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST]
    
   RATIONALE_VALUE_REGEX_LIST = [
       "<scratchpad>(.*?)(</scratchpad>)",
       "(.*?)(</scratchpad>)",
       "(<scratchpad>)(.*?)"
   ]
   RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST]
    
   ANSWER_REGEX = r"(?<=<answer>)(.*)"
   ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL)
    
   ANSWER_TAG = "<answer>"
   FUNCTION_CALL_TAG = "<function_call>"
    
   ASK_USER_FUNCTION_CALL_REGEX = r"(<function_call>user::askuser)(.*)\)"
   ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL)
    
   ASK_USER_FUNCTION_PARAMETER_REGEX = r"(?<=askuser=\")(.*?)\""  
   ASK_USER_FUNCTION_PARAMETER_PATTERN = re.compile(ASK_USER_FUNCTION_PARAMETER_REGEX, re.DOTALL)
    
   KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_"
    
   FUNCTION_CALL_REGEX_API_SCHEMA = r"<function_call>(\w+)::(\w+)::(.+)\((.+)\)"
   FUNCTION_CALL_REGEX_FUNCTION_SCHEMA = r"<function_call>(\w+)::(.+)\((.+)\)"
    
   ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>"
   ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>"  
   ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>"
   ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL)
   ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL)
   ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL)
    
   # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format
   MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the argument askuser for user::askuser function call. Please try again with the correct argument added"
   ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <function_call>user::askuser(askuser=\"$ASK_USER_INPUT\")</function_call>."
   FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = 'The function call format is incorrect. The format for function calls must be: <function_call>$FUNCTION_NAME($FUNCTION_ARGUMENT_NAME=""$FUNCTION_ARGUMENT_NAME"")</function_call>.'
    
   logger = logging.getLogger()
   logger.setLevel("INFO")
    
   # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt
   def lambda_handler(event, context):
       logger.info("Lambda input: " + str(event))
       
       # Sanitize LLM response
       sanitized_response = sanitize_response(event['invokeModelRawResponse'])
       
       # Parse LLM response for any rationale
       rationale = parse_rationale(sanitized_response)
       
       # Construct response fields common to all invocation types
       parsed_response = {
           'promptType': "ORCHESTRATION",
           'orchestrationParsedResponse': {
               'rationale': rationale
           }
       }
       
       # Check if there is a final answer
       try:
           final_answer, generated_response_parts = parse_answer(sanitized_response)
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
           
       if final_answer:
           parsed_response['orchestrationParsedResponse']['responseDetails'] = {
               'invocationType': 'FINISH',
               'agentFinalResponse': {
                   'responseText': final_answer
               }
           }
           
           if generated_response_parts:
               parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = {
                   'generatedResponseParts': generated_response_parts
               }
          
           logger.info("Final answer parsed response: " + str(parsed_response))
           return parsed_response
       
       # Check if there is an ask user
       try:
           ask_user = parse_ask_user(sanitized_response)
           if ask_user:
               parsed_response['orchestrationParsedResponse']['responseDetails'] = {
                   'invocationType': 'ASK_USER',
                   'agentAskUser': {
                       'responseText': ask_user
                   }
               }
               
               logger.info("Ask user parsed response: " + str(parsed_response))
               return parsed_response
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
           
       # Check if there is an agent action
       try:
           parsed_response = parse_function_call(sanitized_response, parsed_response)
           logger.info("Function call parsed response: " + str(parsed_response))
           return parsed_response
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
    
       addRepromptResponse(parsed_response, 'Failed to parse the LLM output')
       logger.info(parsed_response)
       return parsed_response
           
       raise Exception("unrecognized prompt type")
    
   def sanitize_response(text):
       pattern = r"(\\n*)"
       text = re.sub(pattern, r"\n", text)
       return text
       
   def parse_rationale(sanitized_response):
       # Checks for strings that are not required for orchestration
       rationale_matcher = next((pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None)
       
       if rationale_matcher:
           rationale = rationale_matcher.group(1).strip()
           
           # Check if there is a formatted rationale that we can parse from the string
           rationale_value_matcher = next((pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None)
           if rationale_value_matcher:
               return rationale_value_matcher.group(1).strip()
           
           return rationale
       
       return None
       
   def parse_answer(sanitized_llm_response):
       if has_generated_response(sanitized_llm_response):
           return parse_generated_response(sanitized_llm_response)
    
       answer_match = ANSWER_PATTERN.search(sanitized_llm_response)
       if answer_match and is_answer(sanitized_llm_response):
           return answer_match.group(0).strip(), None
           
       return None, None
     
   def is_answer(llm_response):
       return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG)
       
   def parse_generated_response(sanitized_llm_response):
       results = []
       
       for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response):
           part = match.group(1).strip()
           
           text_match = ANSWER_TEXT_PART_PATTERN.search(part)
           if not text_match:
               raise ValueError("Could not parse generated response")
           
           text = text_match.group(1).strip()        
           references = parse_references(sanitized_llm_response, part)
           results.append((text, references))
       
       final_response = " ".join([r[0] for r in results])
       
       generated_response_parts = []
       for text, references in results:
           generatedResponsePart = {
               'text': text, 
               'references': references
           }
           generated_response_parts.append(generatedResponsePart)
           
       return final_response, generated_response_parts
    
       
   def has_generated_response(raw_response):
       return ANSWER_PART_PATTERN.search(raw_response) is not None
    
   def parse_references(raw_response, answer_part):
       references = []
       for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part):
           reference = match.group(1).strip()
           references.append({'sourceId': reference})
       return references
       
   def parse_ask_user(sanitized_llm_response):
       ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response)
       if ask_user_matcher:
           try:
               ask_user = ask_user_matcher.group(2).strip()
               ask_user_question_matcher = ASK_USER_FUNCTION_PARAMETER_PATTERN.search(ask_user)
               if ask_user_question_matcher:
                   return ask_user_question_matcher.group(1).strip()
               raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE)
           except ValueError as ex:
               raise ex
           except Exception as ex:
               raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE)
           
       return None
    
   def parse_function_call(sanitized_response, parsed_response):
       match = re.search(FUNCTION_CALL_REGEX_API_SCHEMA, sanitized_response)
       match_function_schema = re.search(FUNCTION_CALL_REGEX_FUNCTION_SCHEMA, sanitized_response)
       if not match and not match_function_schema:
           raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE)
    
       if match:
           schema_type = 'API'
           verb, resource_name, function, param_arg = match.group(1), match.group(2), match.group(3), match.group(4)
       else:
           schema_type = 'FUNCTION'
           resource_name, function, param_arg = match_function_schema.group(1), match_function_schema.group(2), match_function_schema.group(3)
       
       parameters = {}
       for arg in param_arg.split(","):
           key, value = arg.split("=")
           parameters[key.strip()] = {'value': value.strip('" ')}
           
       parsed_response['orchestrationParsedResponse']['responseDetails'] = {}
           
       # Function calls can either invoke an action group or a knowledge base.
       # Mapping to the correct variable names accordingly
       if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX):
           parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE'
           parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = {
               'searchQuery': parameters['searchQuery'],
               'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '')
           }
           
           return parsed_response
       
       parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP'
       
       if schema_type == 'API':
           parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = {
               "verb": verb, 
               "actionGroupName": resource_name,
               "apiName": function,
               "actionGroupInput": parameters
           }
       else:
           parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = {
               "actionGroupName": resource_name,
               "functionName": function,
               "actionGroupInput": parameters
           }
       
       return parsed_response
       
   def addRepromptResponse(parsed_response, error):
       error_message = str(error)
       logger.warn(error_message)
       
       parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = {
           'repromptResponse': error_message
       }
   ```

------
#### [ Anthropic Claude 2.1 ]

   ```
   import logging
   import re
   import xml.etree.ElementTree as ET
    
   RATIONALE_REGEX_LIST = [
       "(.*?)(<function_calls>)",
       "(.*?)(<answer>)"
   ]
   RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST]
    
   RATIONALE_VALUE_REGEX_LIST = [
       "<scratchpad>(.*?)(</scratchpad>)",
       "(.*?)(</scratchpad>)",
       "(<scratchpad>)(.*?)"
   ]
   RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST]
    
   ANSWER_REGEX = r"(?<=<answer>)(.*)"
   ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL)
    
   ANSWER_TAG = "<answer>"
   FUNCTION_CALL_TAG = "<function_calls>"
    
   ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>"
   ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL)
    
   ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>"
   ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL)
    
   TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>"
   TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL)
    
   ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>"
   ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL)
    
    
   KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_"
    
   FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)"
    
   ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>"
   ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>"
   ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>"
   ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL)
   ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL)
   ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL)
    
   # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format
   MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added."
   ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>."
   FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>."
    
   logger = logging.getLogger()
   logger.setLevel("INFO")
    
   # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt
   def lambda_handler(event, context):
       logger.info("Lambda input: " + str(event))
    
       # Sanitize LLM response
       sanitized_response = sanitize_response(event['invokeModelRawResponse'])
    
       # Parse LLM response for any rationale
       rationale = parse_rationale(sanitized_response)
    
       # Construct response fields common to all invocation types
       parsed_response = {
           'promptType': "ORCHESTRATION",
           'orchestrationParsedResponse': {
               'rationale': rationale
           }
       }
    
       # Check if there is a final answer
       try:
           final_answer, generated_response_parts = parse_answer(sanitized_response)
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
    
       if final_answer:
           parsed_response['orchestrationParsedResponse']['responseDetails'] = {
               'invocationType': 'FINISH',
               'agentFinalResponse': {
                   'responseText': final_answer
               }
           }
    
           if generated_response_parts:
               parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = {
                   'generatedResponseParts': generated_response_parts
               }
    
           logger.info("Final answer parsed response: " + str(parsed_response))
           return parsed_response
    
       # Check if there is an ask user
       try:
           ask_user = parse_ask_user(sanitized_response)
           if ask_user:
               parsed_response['orchestrationParsedResponse']['responseDetails'] = {
                   'invocationType': 'ASK_USER',
                   'agentAskUser': {
                       'responseText': ask_user
                   }
               }
    
               logger.info("Ask user parsed response: " + str(parsed_response))
               return parsed_response
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
    
       # Check if there is an agent action
       try:
           parsed_response = parse_function_call(sanitized_response, parsed_response)
           logger.info("Function call parsed response: " + str(parsed_response))
           return parsed_response
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
    
       addRepromptResponse(parsed_response, 'Failed to parse the LLM output')
       logger.info(parsed_response)
       return parsed_response
    
       raise Exception("unrecognized prompt type")
    
    
   def sanitize_response(text):
       pattern = r"(\\n*)"
       text = re.sub(pattern, r"\n", text)
       return text
    
    
   def parse_rationale(sanitized_response):
       # Checks for strings that are not required for orchestration
       rationale_matcher = next(
           (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)),
           None)
    
       if rationale_matcher:
           rationale = rationale_matcher.group(1).strip()
    
           # Check if there is a formatted rationale that we can parse from the string
           rationale_value_matcher = next(
               (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None)
           if rationale_value_matcher:
               return rationale_value_matcher.group(1).strip()
    
           return rationale
    
       return None
    
    
   def parse_answer(sanitized_llm_response):
       if has_generated_response(sanitized_llm_response):
           return parse_generated_response(sanitized_llm_response)
    
       answer_match = ANSWER_PATTERN.search(sanitized_llm_response)
       if answer_match and is_answer(sanitized_llm_response):
           return answer_match.group(0).strip(), None
    
       return None, None
    
    
   def is_answer(llm_response):
       return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG)
    
    
   def parse_generated_response(sanitized_llm_response):
       results = []
    
       for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response):
           part = match.group(1).strip()
    
           text_match = ANSWER_TEXT_PART_PATTERN.search(part)
           if not text_match:
               raise ValueError("Could not parse generated response")
    
           text = text_match.group(1).strip()
           references = parse_references(sanitized_llm_response, part)
           results.append((text, references))
    
       final_response = " ".join([r[0] for r in results])
    
       generated_response_parts = []
       for text, references in results:
           generatedResponsePart = {
               'text': text,
               'references': references
           }
           generated_response_parts.append(generatedResponsePart)
    
       return final_response, generated_response_parts
    
    
   def has_generated_response(raw_response):
       return ANSWER_PART_PATTERN.search(raw_response) is not None
    
    
   def parse_references(raw_response, answer_part):
       references = []
       for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part):
           reference = match.group(1).strip()
           references.append({'sourceId': reference})
       return references
    
    
   def parse_ask_user(sanitized_llm_response):
       ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response)
       if ask_user_matcher:
           try:
               parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response)
               params = parameters_matches.group(1).strip()
               ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params)
               if ask_user_question_matcher:
                   ask_user_question = ask_user_question_matcher.group(1)
                   return ask_user_question
               raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE)
           except ValueError as ex:
               raise ex
           except Exception as ex:
               raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE)
    
       return None
    
    
   def parse_function_call(sanitized_response, parsed_response):
       match = re.search(FUNCTION_CALL_REGEX, sanitized_response)
       if not match:
           raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE)
    
       tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response)
       tool_name = tool_name_matches.group(1)
       parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response)
       params = parameters_matches.group(1).strip()
    
       action_split = tool_name.split('::')
       schema_type = 'FUNCTION' if len(action_split) == 2 else 'API'
    
       if schema_type == 'API':
           verb = action_split[0].strip()
           resource_name = action_split[1].strip()
           function = action_split[2].strip()
       else:
           resource_name = action_split[0].strip()
           function = action_split[1].strip()
    
       xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params)))
       parameters = {}
       for elem in xml_tree.iter():
           if elem.text:
               parameters[elem.tag] = {'value': elem.text.strip('" ')}
    
       parsed_response['orchestrationParsedResponse']['responseDetails'] = {}
    
       # Function calls can either invoke an action group or a knowledge base.
       # Mapping to the correct variable names accordingly
       if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX):
           parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE'
           parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = {
               'searchQuery': parameters['searchQuery'],
               'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '')
           }
    
           return parsed_response
    
       parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP'
       if schema_type == 'API':
           parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = {
               "verb": verb,
               "actionGroupName": resource_name,
               "apiName": function,
               "actionGroupInput": parameters
           }
       else:
           parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = {
               "actionGroupName": resource_name,
               "functionName": function,
               "actionGroupInput": parameters
           }
    
       return parsed_response
    
    
   def addRepromptResponse(parsed_response, error):
       error_message = str(error)
       logger.warn(error_message)
    
       parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = {
           'repromptResponse': error_message
       }
   ```

------
#### [ Anthropic Claude 3 ]

   ```
   import logging
   import re
   import xml.etree.ElementTree as ET
    
   RATIONALE_REGEX_LIST = [
       "(.*?)(<function_calls>)",
       "(.*?)(<answer>)"
   ]
   RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST]
    
   RATIONALE_VALUE_REGEX_LIST = [
       "<thinking>(.*?)(</thinking>)",
       "(.*?)(</thinking>)",
       "(<thinking>)(.*?)"
   ]
   RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST]
    
   ANSWER_REGEX = r"(?<=<answer>)(.*)"
   ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL)
    
   ANSWER_TAG = "<answer>"
   FUNCTION_CALL_TAG = "<function_calls>"
    
   ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>"
   ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL)
    
   ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>"
   ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL)
    
   TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>"
   TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL)
    
   ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>"
   ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL)
    
    
   KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_"
    
   FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)"
    
   ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>"
   ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>"
   ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>"
   ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL)
   ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL)
   ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL)
    
   # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format
   MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added."
   ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>."
   FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>."
    
   logger = logging.getLogger()
    
    
   # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt
   def lambda_handler(event, context):
       logger.info("Lambda input: " + str(event))
    
       # Sanitize LLM response
       sanitized_response = sanitize_response(event['invokeModelRawResponse'])
    
       # Parse LLM response for any rationale
       rationale = parse_rationale(sanitized_response)
    
       # Construct response fields common to all invocation types
       parsed_response = {
           'promptType': "ORCHESTRATION",
           'orchestrationParsedResponse': {
               'rationale': rationale
           }
       }
    
       # Check if there is a final answer
       try:
           final_answer, generated_response_parts = parse_answer(sanitized_response)
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
    
       if final_answer:
           parsed_response['orchestrationParsedResponse']['responseDetails'] = {
               'invocationType': 'FINISH',
               'agentFinalResponse': {
                   'responseText': final_answer
               }
           }
    
           if generated_response_parts:
               parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = {
                   'generatedResponseParts': generated_response_parts
               }
    
           logger.info("Final answer parsed response: " + str(parsed_response))
           return parsed_response
    
       # Check if there is an ask user
       try:
           ask_user = parse_ask_user(sanitized_response)
           if ask_user:
               parsed_response['orchestrationParsedResponse']['responseDetails'] = {
                   'invocationType': 'ASK_USER',
                   'agentAskUser': {
                       'responseText': ask_user
                   }
               }
    
               logger.info("Ask user parsed response: " + str(parsed_response))
               return parsed_response
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
    
       # Check if there is an agent action
       try:
           parsed_response = parse_function_call(sanitized_response, parsed_response)
           logger.info("Function call parsed response: " + str(parsed_response))
           return parsed_response
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
    
       addRepromptResponse(parsed_response, 'Failed to parse the LLM output')
       logger.info(parsed_response)
       return parsed_response
    
       raise Exception("unrecognized prompt type")
    
    
   def sanitize_response(text):
       pattern = r"(\\n*)"
       text = re.sub(pattern, r"\n", text)
       return text
    
    
   def parse_rationale(sanitized_response):
       # Checks for strings that are not required for orchestration
       rationale_matcher = next(
           (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)),
           None)
    
       if rationale_matcher:
           rationale = rationale_matcher.group(1).strip()
    
           # Check if there is a formatted rationale that we can parse from the string
           rationale_value_matcher = next(
               (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None)
           if rationale_value_matcher:
               return rationale_value_matcher.group(1).strip()
    
           return rationale
    
       return None
    
    
   def parse_answer(sanitized_llm_response):
       if has_generated_response(sanitized_llm_response):
           return parse_generated_response(sanitized_llm_response)
    
       answer_match = ANSWER_PATTERN.search(sanitized_llm_response)
       if answer_match and is_answer(sanitized_llm_response):
           return answer_match.group(0).strip(), None
    
       return None, None
    
    
   def is_answer(llm_response):
       return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG)
    
    
   def parse_generated_response(sanitized_llm_response):
       results = []
    
       for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response):
           part = match.group(1).strip()
    
           text_match = ANSWER_TEXT_PART_PATTERN.search(part)
           if not text_match:
               raise ValueError("Could not parse generated response")
    
           text = text_match.group(1).strip()
           references = parse_references(sanitized_llm_response, part)
           results.append((text, references))
    
       final_response = " ".join([r[0] for r in results])
    
       generated_response_parts = []
       for text, references in results:
           generatedResponsePart = {
               'text': text,
               'references': references
           }
           generated_response_parts.append(generatedResponsePart)
    
       return final_response, generated_response_parts
    
    
   def has_generated_response(raw_response):
       return ANSWER_PART_PATTERN.search(raw_response) is not None
    
    
   def parse_references(raw_response, answer_part):
       references = []
       for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part):
           reference = match.group(1).strip()
           references.append({'sourceId': reference})
       return references
    
    
   def parse_ask_user(sanitized_llm_response):
       ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response)
       if ask_user_matcher:
           try:
               parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response)
               params = parameters_matches.group(1).strip()
               ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params)
               if ask_user_question_matcher:
                   ask_user_question = ask_user_question_matcher.group(1)
                   return ask_user_question
               raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE)
           except ValueError as ex:
               raise ex
           except Exception as ex:
               raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE)
    
       return None
    
    
   def parse_function_call(sanitized_response, parsed_response):
       match = re.search(FUNCTION_CALL_REGEX, sanitized_response)
       if not match:
           raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE)
    
       tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response)
       tool_name = tool_name_matches.group(1)
       parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response)
       params = parameters_matches.group(1).strip()
    
       action_split = tool_name.split('::')
       schema_type = 'FUNCTION' if len(action_split) == 2 else 'API'
    
       if schema_type == 'API':
           verb = action_split[0].strip()
           resource_name = action_split[1].strip()
           function = action_split[2].strip()
       else:
           resource_name = action_split[0].strip()
           function = action_split[1].strip()
    
       xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params)))
       parameters = {}
       for elem in xml_tree.iter():
           if elem.text:
               parameters[elem.tag] = {'value': elem.text.strip('" ')}
    
       parsed_response['orchestrationParsedResponse']['responseDetails'] = {}
    
       # Function calls can either invoke an action group or a knowledge base.
       # Mapping to the correct variable names accordingly
       if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX):
           parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE'
           parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = {
               'searchQuery': parameters['searchQuery'],
               'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '')
           }
    
           return parsed_response
    
       parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP'
       if schema_type == 'API':
           parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = {
               "verb": verb,
               "actionGroupName": resource_name,
               "apiName": function,
               "actionGroupInput": parameters
           }
       else:
           parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = {
               "actionGroupName": resource_name,
               "functionName": function,
               "actionGroupInput": parameters
           }
    
       return parsed_response
    
    
   def addRepromptResponse(parsed_response, error):
       error_message = str(error)
       logger.warn(error_message)
    
       parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = {
           'repromptResponse': error_message
       }
   ```

------
#### [ Anthropic Claude 3.5 ]

   ```
   import json
   import logging
   import re
   from collections import defaultdict
   
   RATIONALE_VALUE_REGEX_LIST = [
     "<thinking>(.*?)(</thinking>)",
     "(.*?)(</thinking>)",
     "(<thinking>)(.*?)"
   ]
   RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in
                               RATIONALE_VALUE_REGEX_LIST]
   
   ANSWER_REGEX = r"(?<=<answer>)(.*)"
   ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL)
   
   ANSWER_TAG = "<answer>"
   ASK_USER = "user__askuser"
   
   KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_"
   
   ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>"
   ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>"
   ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>"
   ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL)
   ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL)
   ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX,
                                              re.DOTALL)
   
   # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format
   MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user__askuser function call. Please try again with the correct argument added."
   FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The tool name format is incorrect. The format for the tool name must be: 'httpVerb__actionGroupName__apiName."
   logger = logging.getLogger()
   
   
   # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt
   def lambda_handler(event, context):
     logger.setLevel("INFO")
     logger.info("Lambda input: " + str(event))
   
     # Sanitize LLM response
     response = load_response(event['invokeModelRawResponse'])
   
     stop_reason = response["stop_reason"]
     content = response["content"]
     content_by_type = get_content_by_type(content)
   
     # Parse LLM response for any rationale
     rationale = parse_rationale(content_by_type)
   
     # Construct response fields common to all invocation types
     parsed_response = {
       'promptType': "ORCHESTRATION",
       'orchestrationParsedResponse': {
         'rationale': rationale
       }
     }
   
     match stop_reason:
       case 'tool_use':
         # Check if there is an ask user
         try:
           ask_user = parse_ask_user(content_by_type)
           if ask_user:
             parsed_response['orchestrationParsedResponse']['responseDetails'] = {
               'invocationType': 'ASK_USER',
               'agentAskUser': {
                 'responseText': ask_user,
                 'id': content_by_type['tool_use'][0]['id']
               },
   
             }
   
             logger.info("Ask user parsed response: " + str(parsed_response))
             return parsed_response
         except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
   
         # Check if there is an agent action
         try:
           parsed_response = parse_function_call(content_by_type, parsed_response)
           logger.info("Function call parsed response: " + str(parsed_response))
           return parsed_response
         except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
   
       case 'end_turn' | 'stop_sequence':
         # Check if there is a final answer
         try:
           if content_by_type["text"]:
             text_contents = content_by_type["text"]
             for text_content in text_contents:
               final_answer, generated_response_parts = parse_answer(text_content)
               if final_answer:
                 parsed_response['orchestrationParsedResponse'][
                   'responseDetails'] = {
                   'invocationType': 'FINISH',
                   'agentFinalResponse': {
                     'responseText': final_answer
                   }
                 }
   
               if generated_response_parts:
                 parsed_response['orchestrationParsedResponse']['responseDetails'][
                   'agentFinalResponse']['citations'] = {
                   'generatedResponseParts': generated_response_parts
                 }
   
               logger.info("Final answer parsed response: " + str(parsed_response))
               return parsed_response
         except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
       case _:
         addRepromptResponse(parsed_response, 'Failed to parse the LLM output')
         logger.info(parsed_response)
         return parsed_response
   
   
   def load_response(text):
     raw_text = r'{}'.format(text)
     json_text = json.loads(raw_text)
     return json_text
   
   
   def get_content_by_type(content):
     content_by_type = defaultdict(list)
     for content_value in content:
       content_by_type[content_value["type"]].append(content_value)
     return content_by_type
   
   
   def parse_rationale(content_by_type):
     if "text" in content_by_type:
       rationale = content_by_type["text"][0]["text"]
       if rationale is not None:
         rationale_matcher = next(
             (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if
              pattern.search(rationale)),
             None)
         if rationale_matcher:
           rationale = rationale_matcher.group(1).strip()
       return rationale
     return None
   
   
   def parse_answer(response):
     if has_generated_response(response["text"].strip()):
       return parse_generated_response(response)
   
     answer_match = ANSWER_PATTERN.search(response["text"].strip())
     if answer_match:
       return answer_match.group(0).strip(), None
   
     return None, None
   
   
   def parse_generated_response(response):
     results = []
   
     for match in ANSWER_PART_PATTERN.finditer(response):
       part = match.group(1).strip()
   
       text_match = ANSWER_TEXT_PART_PATTERN.search(part)
       if not text_match:
         raise ValueError("Could not parse generated response")
   
       text = text_match.group(1).strip()
       references = parse_references(part)
       results.append((text, references))
   
     final_response = " ".join([r[0] for r in results])
   
     generated_response_parts = []
     for text, references in results:
       generatedResponsePart = {
         'text': text,
         'references': references
       }
       generated_response_parts.append(generatedResponsePart)
   
     return final_response, generated_response_parts
   
   
   def has_generated_response(raw_response):
     return ANSWER_PART_PATTERN.search(raw_response) is not None
   
   
   def parse_references(answer_part):
     references = []
     for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part):
       reference = match.group(1).strip()
       references.append({'sourceId': reference})
     return references
   
   
   def parse_ask_user(content_by_type):
     try:
       if content_by_type["tool_use"][0]["name"] == ASK_USER:
         ask_user_question = content_by_type["tool_use"][0]["input"]["question"]
         if not ask_user_question:
           raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE)
         return ask_user_question
     except ValueError as ex:
       raise ex
     return None
   
   
   def parse_function_call(content_by_type, parsed_response):
     try:
       content = content_by_type["tool_use"][0]
       tool_name = content["name"]
   
       action_split = tool_name.split('__')
   
       schema_type = 'FUNCTION' if len(action_split) == 2 else 'API'
       if schema_type == 'API':
         verb = action_split[0].strip()
         resource_name = action_split[1].strip()
         function = action_split[2].strip()
       else:
         resource_name = action_split[1].strip()
         function = action_split[2].strip()
   
     except ValueError as ex:
       raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE)
   
     parameters = {}
     for param, value in content["input"].items():
       parameters[param] = {'value': value}
   
     parsed_response['orchestrationParsedResponse']['responseDetails'] = {}
   
     # Function calls can either invoke an action group or a knowledge base.
     # Mapping to the correct variable names accordingly
     if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX):
       parsed_response['orchestrationParsedResponse']['responseDetails'][
         'invocationType'] = 'KNOWLEDGE_BASE'
       parsed_response['orchestrationParsedResponse']['responseDetails'][
         'agentKnowledgeBase'] = {
         'searchQuery': parameters['searchQuery'],
         'knowledgeBaseId': resource_name.replace(
             KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, ''),
         'id': content["id"]
       }
       return parsed_response
     parsed_response['orchestrationParsedResponse']['responseDetails'][
       'invocationType'] = 'ACTION_GROUP'
     if schema_type == 'API':
       parsed_response['orchestrationParsedResponse']['responseDetails'][
         'actionGroupInvocation'] = {
         "verb": verb,
         "actionGroupName": resource_name,
         "apiName": function,
         "actionGroupInput": parameters,
         "id": content["id"]
       }
     else:
       parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = {
         "actionGroupName": resource_name,
         "functionName": function,
         "actionGroupInput": parameters
        }
     return parsed_response
   
   
   def addRepromptResponse(parsed_response, error):
     error_message = str(error)
     logger.warn(error_message)
   
     parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = {
       'repromptResponse': error_message
     }
   ```

------

### Génération de réponses de la base de connaissances
<a name="parser-kb"></a>

L’exemple suivant illustre une fonction Lambda d’analyseur de génération de réponses de la base de connaissances écrite en Python.

```
import json
import re
import logging
 
ANSWER_PART_REGEX = "&lt;answer_part\\s?>(.+?)&lt;/answer_part\\s?>"
ANSWER_TEXT_PART_REGEX = "&lt;text\\s?>(.+?)&lt;/text\\s?>"  
ANSWER_REFERENCE_PART_REGEX = "&lt;source\\s?>(.+?)&lt;/source\\s?>"
ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL)
ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL)
ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL)

logger = logging.getLogger()
 
# This parser lambda is an example of how to parse the LLM output for the default KB response generation prompt
def lambda_handler(event, context):
    logger.info("Lambda input: " + str(event))
    raw_response = event['invokeModelRawResponse']
    
    parsed_response = {
        'promptType': 'KNOWLEDGE_BASE_RESPONSE_GENERATION',
        'knowledgeBaseResponseGenerationParsedResponse': {
            'generatedResponse': parse_generated_response(raw_response)
        }
    }
    
    logger.info(parsed_response)
    return parsed_response
    
def parse_generated_response(sanitized_llm_response):
    results = []
    
    for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response):
        part = match.group(1).strip()
        
        text_match = ANSWER_TEXT_PART_PATTERN.search(part)
        if not text_match:
            raise ValueError("Could not parse generated response")
        
        text = text_match.group(1).strip()        
        references = parse_references(sanitized_llm_response, part)
        results.append((text, references))
    
    generated_response_parts = []
    for text, references in results:
        generatedResponsePart = {
            'text': text, 
            'references': references
        }
        generated_response_parts.append(generatedResponsePart)
        
    return {
        'generatedResponseParts': generated_response_parts
    }
    
def parse_references(raw_response, answer_part):
    references = []
    for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part):
        reference = match.group(1).strip()
        references.append({'sourceId': reference})
    return references
```

### Post-traitement
<a name="parser-postprocessing"></a>

L’exemple suivant illustre une fonction Lambda d’analyseur de post-traitement écrite en Python.

```
import json
import re
import logging
 
FINAL_RESPONSE_REGEX = r"&lt;final_response>([\s\S]*?)&lt;/final_response>"
FINAL_RESPONSE_PATTERN = re.compile(FINAL_RESPONSE_REGEX, re.DOTALL)

logger = logging.getLogger()
 
# This parser lambda is an example of how to parse the LLM output for the default PostProcessing prompt
def lambda_handler(event, context):
    logger.info("Lambda input: " + str(event))
    raw_response = event['invokeModelRawResponse']
    
    parsed_response = {
        'promptType': 'POST_PROCESSING',
        'postProcessingParsedResponse': {}
    }
    
    matcher = FINAL_RESPONSE_PATTERN.search(raw_response)
    if not matcher:
        raise Exception("Could not parse raw LLM output")
    response_text = matcher.group(1).strip()
    
    parsed_response['postProcessingParsedResponse']['responseText'] = response_text
    
    logger.info(parsed_response)
    return parsed_response
```

### Synthèse de mémoire
<a name="parser-memory-summarization"></a>

L’exemple suivant illustre une fonction Lambda d’analyseur de synthèse de mémoire écrite en Python.

```
import re
import logging

SUMMARY_TAG_PATTERN = r'<summary>(.*?)</summary>'
TOPIC_TAG_PATTERN = r'<topic name="(.+?)"\s*>(.+?)</topic>'
logger = logging.getLogger()

# This parser lambda is an example of how to parse the LLM output for the default LTM SUmmarization prompt
def lambda_handler(event, context):
    logger.info("Lambda input: " + str(event))
    
    # Sanitize LLM response
    model_response = sanitize_response(event['invokeModelRawResponse'])
    
    if event["promptType"] == "MEMORY_SUMMARIZATION":
        return format_response(parse_llm_response(model_response), event["promptType"])

def format_response(topic_summaries, prompt_type):
    return {
        "promptType": prompt_type,
        "memorySummarizationParsedResponse": {
            "topicwiseSummaries": topic_summaries
        }
    }
    
def parse_llm_response(output: str):
    # First extract content within summary tag
    summary_match = re.search(SUMMARY_TAG_PATTERN, output, re.DOTALL)
    if not summary_match:
        raise Exception("Error while parsing summarizer model output, no summary tag found!")
    
    summary_content = summary_match.group(1)
    topic_summaries = parse_topic_wise_summaries(summary_content)
        
    return topic_summaries

def parse_topic_wise_summaries(content):
    summaries = []
    # Then extract content within topic tag
    for match in re.finditer(TOPIC_TAG_PATTERN, content, re.DOTALL):
        topic_name = match.group(1)
        topic_summary = match.group(2).strip()
        summaries.append({
            'topic': topic_name,
            'summary': topic_summary
        })
    if not summaries:
        raise Exception("Error while parsing summarizer model output, no topics found!")
    return summaries

def sanitize_response(text):
    pattern = r"(\\n*)"
    text = re.sub(pattern, r"\n", text)
    return text
```