

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 Python 对自定义 CloudFormation 挂钩进行建模
<a name="hooks-model-python"></a>

对自定义 CloudFormation Hook 进行建模涉及创建一个用于定义 Hook、其属性和属性的架构。本教程将引导你使用 Python 对自定义挂钩进行建模。

## 步骤 1：生成 Hook 项目包
<a name="model-hook-project-package-python"></a>

生成你的 Hook 项目包。 CloudFormation CLI 创建空的处理程序函数，这些函数对应于 Hook 规范中定义的目标生命周期中的特定 Hook 操作。

```
cfn generate
```

该命令将返回以下输出。

```
Generated files for MyCompany::Testing::MyTestHook
```

**注意**  
确保您的 Lambda 运行时 up-to-date避免使用已弃用的版本。有关更多信息，请参阅[更新资源类型的 Lambda 运行时和挂钩](https://docs.aws.amazon.com/cloudformation-cli/latest/userguide/runtime-update.html)。

## 第 2 步：添加 Hook 处理程序
<a name="model-hook-project-add-handler"></a>

将您自己的 Hook 处理程序运行时代码添加到您选择实现的处理程序中。例如，您可以添加以下用于记录的代码。

```
LOG.setLevel(logging.INFO)
LOG.info("Internal testing Hook triggered for target: " + request.hookContext.targetName);
```

C CloudFormation LI 从生成`src/models.py`文件[挂钩配置架构语法参考](hook-configuration-schema.md)。

**Example models.py**  

```
import sys
from dataclasses import dataclass
from inspect import getmembers, isclass
from typing import (
    AbstractSet,
    Any,
    Generic,
    Mapping,
    MutableMapping,
    Optional,
    Sequence,
    Type,
    TypeVar,
)

from cloudformation_cli_python_lib.interface import (
    BaseModel,
    BaseHookHandlerRequest,
)
from cloudformation_cli_python_lib.recast import recast_object
from cloudformation_cli_python_lib.utils import deserialize_list

T = TypeVar("T")


def set_or_none(value: Optional[Sequence[T]]) -> Optional[AbstractSet[T]]:
    if value:
        return set(value)
    return None


@dataclass
class HookHandlerRequest(BaseHookHandlerRequest):
    pass


@dataclass
class TypeConfigurationModel(BaseModel):
    limitSize: Optional[str]
    cidr: Optional[str]
    encryptionAlgorithm: Optional[str]

    @classmethod
    def _deserialize(
        cls: Type["_TypeConfigurationModel"],
        json_data: Optional[Mapping[str, Any]],
    ) -> Optional["_TypeConfigurationModel"]:
        if not json_data:
            return None
        return cls(
            limitSize=json_data.get("limitSize"),
            cidr=json_data.get("cidr"),
            encryptionAlgorithm=json_data.get("encryptionAlgorithm"),
        )


_TypeConfigurationModel = TypeConfigurationModel
```

## 第 3 步：实现挂钩处理程序
<a name="model-hook-project-code-handler-python"></a>

生成的 Python 数据类后，你可以编写实际实现 Hook 功能的处理程序。在此示例中，您将为处理程序实现`preCreate``preUpdate`、和`preDelete`调用点。

**Topics**
+ [实现预创建处理程序](#model-hook-project-code-handler-python-precreate)
+ [实现预更新处理程序](#model-hook-project-code-handler-python-preupdate)
+ [实现预删除处理程序](#model-hook-project-code-handler-python-predelete)
+ [实现一个 Hook 处理程序](#model-hook-project-code-handler-python-hook-handler)

### 实现预创建处理程序
<a name="model-hook-project-code-handler-python-precreate"></a>

`preCreate`处理程序验证`AWS::S3::Bucket `或`AWS::SQS::Queue`资源的服务器端加密设置。
+ 对于`AWS::S3::Bucket`资源，只有在满足以下条件时，Hook 才会通过。
  + Amazon S3 存储桶加密已设置。
  + 已为该存储桶启用了 Amazon S3 存储桶密钥。
  + 为 Amazon S3 存储桶设置的加密算法是所需的正确算法。
  + 密 AWS Key Management Service 钥 ID 已设置。
+ 对于`AWS::SQS::Queue`资源，只有在满足以下条件时，Hook 才会通过。
  + 密 AWS Key Management Service 钥 ID 已设置。

### 实现预更新处理程序
<a name="model-hook-project-code-handler-python-preupdate"></a>

实现一个`preUpdate`处理程序，该处理程序在处理程序中所有指定目标的更新操作之前启动。该`preUpdate`处理程序完成以下任务：
+ 对于`AWS::S3::Bucket`资源，只有在满足以下条件时，Hook 才会通过：
  + Amazon S3 存储桶的存储桶加密算法尚未修改。

### 实现预删除处理程序
<a name="model-hook-project-code-handler-python-predelete"></a>

实现一个`preDelete`处理程序，该处理程序在处理程序中所有指定目标的删除操作之前启动。该`preDelete`处理程序完成以下任务：
+ 对于`AWS::S3::Bucket`资源，只有在满足以下条件时，Hook 才会通过：
  + 验证删除资源后账户中是否存在所需的最低合规资源。
  + 所需的最低合规资源量在 Hook 的配置中设置。

### 实现一个 Hook 处理程序
<a name="model-hook-project-code-handler-python-hook-handler"></a>

1. 在 IDE 中，打开位于`src`文件夹中的`handlers.py`文件。

1. 用以下代码替换`handlers.py`文件的全部内容。  
**Example handlers.py**  

   ```
   import logging
   from typing import Any, MutableMapping, Optional
   import botocore
   
   from cloudformation_cli_python_lib import (
       BaseHookHandlerRequest,
       HandlerErrorCode,
       Hook,
       HookInvocationPoint,
       OperationStatus,
       ProgressEvent,
       SessionProxy,
       exceptions,
   )
   
   from .models import HookHandlerRequest, TypeConfigurationModel
   
   # Use this logger to forward log messages to CloudWatch Logs.
   LOG = logging.getLogger(__name__)
   TYPE_NAME = "MyCompany::Testing::MyTestHook"
   
   LOG.setLevel(logging.INFO)
   
   hook = Hook(TYPE_NAME, TypeConfigurationModel)
   test_entrypoint = hook.test_entrypoint
   
   
   def _validate_s3_bucket_encryption(
       bucket: MutableMapping[str, Any], required_encryption_algorithm: str
   ) -> ProgressEvent:
       status = None
       message = ""
       error_code = None
   
       if bucket:
           bucket_name = bucket.get("BucketName")
   
           bucket_encryption = bucket.get("BucketEncryption")
           if bucket_encryption:
               server_side_encryption_rules = bucket_encryption.get(
                   "ServerSideEncryptionConfiguration"
               )
               if server_side_encryption_rules:
                   for rule in server_side_encryption_rules:
                       bucket_key_enabled = rule.get("BucketKeyEnabled")
                       if bucket_key_enabled:
                           server_side_encryption_by_default = rule.get(
                               "ServerSideEncryptionByDefault"
                           )
   
                           encryption_algorithm = server_side_encryption_by_default.get(
                               "SSEAlgorithm"
                           )
                           kms_key_id = server_side_encryption_by_default.get(
                               "KMSMasterKeyID"
                           )  # "KMSMasterKeyID" is name of the property for an AWS::S3::Bucket
   
                           if encryption_algorithm == required_encryption_algorithm:
                               if encryption_algorithm == "aws:kms" and not kms_key_id:
                                   status = OperationStatus.FAILED
                                   message = f"KMS Key ID not set for bucket with name: f{bucket_name}"
                               else:
                                   status = OperationStatus.SUCCESS
                                   message = f"Successfully invoked PreCreateHookHandler for AWS::S3::Bucket with name: {bucket_name}"
                           else:
                               status = OperationStatus.FAILED
                               message = f"SSE Encryption Algorithm is incorrect for bucket with name: {bucket_name}"
                       else:
                           status = OperationStatus.FAILED
                           message = f"Bucket key not enabled for bucket with name: {bucket_name}"
   
                       if status == OperationStatus.FAILED:
                           break
               else:
                   status = OperationStatus.FAILED
                   message = f"No SSE Encryption configurations for bucket with name: {bucket_name}"
           else:
               status = OperationStatus.FAILED
               message = (
                   f"Bucket Encryption not enabled for bucket with name: {bucket_name}"
               )
       else:
           status = OperationStatus.FAILED
           message = "Resource properties for S3 Bucket target model are empty"
   
       if status == OperationStatus.FAILED:
           error_code = HandlerErrorCode.NonCompliant
   
       return ProgressEvent(status=status, message=message, errorCode=error_code)
   
   
   def _validate_sqs_queue_encryption(queue: MutableMapping[str, Any]) -> ProgressEvent:
       if not queue:
           return ProgressEvent(
               status=OperationStatus.FAILED,
               message="Resource properties for SQS Queue target model are empty",
               errorCode=HandlerErrorCode.NonCompliant,
           )
       queue_name = queue.get("QueueName")
   
       kms_key_id = queue.get(
           "KmsMasterKeyId"
       )  # "KmsMasterKeyId" is name of the property for an AWS::SQS::Queue
       if not kms_key_id:
           return ProgressEvent(
               status=OperationStatus.FAILED,
               message=f"Server side encryption turned off for queue with name: {queue_name}",
               errorCode=HandlerErrorCode.NonCompliant,
           )
   
       return ProgressEvent(
           status=OperationStatus.SUCCESS,
           message=f"Successfully invoked PreCreateHookHandler for targetAWS::SQS::Queue with name: {queue_name}",
       )
   
   
   @hook.handler(HookInvocationPoint.CREATE_PRE_PROVISION)
   def pre_create_handler(
       session: Optional[SessionProxy],
       request: HookHandlerRequest,
       callback_context: MutableMapping[str, Any],
       type_configuration: TypeConfigurationModel,
   ) -> ProgressEvent:
       target_name = request.hookContext.targetName
       if "AWS::S3::Bucket" == target_name:
           return _validate_s3_bucket_encryption(
               request.hookContext.targetModel.get("resourceProperties"),
               type_configuration.encryptionAlgorithm,
           )
       elif "AWS::SQS::Queue" == target_name:
           return _validate_sqs_queue_encryption(
               request.hookContext.targetModel.get("resourceProperties")
           )
       else:
           raise exceptions.InvalidRequest(f"Unknown target type: {target_name}")
   
   
   def _validate_bucket_encryption_rules_not_updated(
       resource_properties, previous_resource_properties
   ) -> ProgressEvent:
       bucket_encryption_configs = resource_properties.get("BucketEncryption", {}).get(
           "ServerSideEncryptionConfiguration", []
       )
       previous_bucket_encryption_configs = previous_resource_properties.get(
           "BucketEncryption", {}
       ).get("ServerSideEncryptionConfiguration", [])
   
       if len(bucket_encryption_configs) != len(previous_bucket_encryption_configs):
           return ProgressEvent(
               status=OperationStatus.FAILED,
               message=f"Current number of bucket encryption configs does not match previous. Current has {str(len(bucket_encryption_configs))} configs while previously there were {str(len(previous_bucket_encryption_configs))} configs",
               errorCode=HandlerErrorCode.NonCompliant,
           )
   
       for i in range(len(bucket_encryption_configs)):
           current_encryption_algorithm = (
               bucket_encryption_configs[i]
               .get("ServerSideEncryptionByDefault", {})
               .get("SSEAlgorithm")
           )
           previous_encryption_algorithm = (
               previous_bucket_encryption_configs[i]
               .get("ServerSideEncryptionByDefault", {})
               .get("SSEAlgorithm")
           )
   
           if current_encryption_algorithm != previous_encryption_algorithm:
               return ProgressEvent(
                   status=OperationStatus.FAILED,
                   message=f"Bucket Encryption algorithm can not be changed once set. The encryption algorithm was changed to {current_encryption_algorithm} from {previous_encryption_algorithm}.",
                   errorCode=HandlerErrorCode.NonCompliant,
               )
   
       return ProgressEvent(
           status=OperationStatus.SUCCESS,
           message="Successfully invoked PreUpdateHookHandler for target: AWS::SQS::Queue",
       )
   
   
   def _validate_queue_encryption_not_disabled(
       resource_properties, previous_resource_properties
   ) -> ProgressEvent:
       if previous_resource_properties.get(
           "KmsMasterKeyId"
       ) and not resource_properties.get("KmsMasterKeyId"):
           return ProgressEvent(
               status=OperationStatus.FAILED,
               errorCode=HandlerErrorCode.NonCompliant,
               message="Queue encryption can not be disable",
           )
       else:
           return ProgressEvent(status=OperationStatus.SUCCESS)
   
   
   @hook.handler(HookInvocationPoint.UPDATE_PRE_PROVISION)
   def pre_update_handler(
       session: Optional[SessionProxy],
       request: BaseHookHandlerRequest,
       callback_context: MutableMapping[str, Any],
       type_configuration: MutableMapping[str, Any],
   ) -> ProgressEvent:
       target_name = request.hookContext.targetName
       if "AWS::S3::Bucket" == target_name:
           resource_properties = request.hookContext.targetModel.get("resourceProperties")
           previous_resource_properties = request.hookContext.targetModel.get(
               "previousResourceProperties"
           )
   
           return _validate_bucket_encryption_rules_not_updated(
               resource_properties, previous_resource_properties
           )
       elif "AWS::SQS::Queue" == target_name:
           resource_properties = request.hookContext.targetModel.get("resourceProperties")
           previous_resource_properties = request.hookContext.targetModel.get(
               "previousResourceProperties"
           )
   
           return _validate_queue_encryption_not_disabled(
               resource_properties, previous_resource_properties
           )
       else:
           raise exceptions.InvalidRequest(f"Unknown target type: {target_name}")
   ```

继续进入下一个主题 [向注册自定义 Hook CloudFormation](registering-hooks.md)。