AWS 文档 AWS SDK示例 GitHub 存储库中还有更多SDK示例
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
用SDK于 Python 的 Audit Manager 示例 (Boto3)
以下代码示例向您展示了如何使用与 Audit Manager AWS SDK for Python (Boto3) 配合使用来执行操作和实现常见场景。
场景是向您展示如何通过在一个服务中调用多个函数或与其他 AWS 服务结合来完成特定任务的代码示例。
每个示例都包含一个指向完整源代码的链接,您可以在其中找到有关如何在上下文中设置和运行代码的说明。
主题
场景
以下代码示例展示了如何:
获取 AWS Config 一致性包清单。
为合规包中的每条托管规则创建 Audit Manager 自定义控制。
创建包含控制的 Audit Manager 自定义框架。
- SDK适用于 Python (Boto3)
-
注意
还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 import logging import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) class ConformancePack: def __init__(self, config_client, auditmanager_client): self.config_client = config_client self.auditmanager_client = auditmanager_client def get_conformance_pack(self): """ Return a selected conformance pack from the list of conformance packs. :return: selected conformance pack """ try: conformance_packs = self.config_client.describe_conformance_packs() print( "Number of conformance packs fetched: ", len(conformance_packs.get("ConformancePackDetails")), ) print("Fetched the following conformance packs: ") all_cpack_names = { cp["ConformancePackName"] for cp in conformance_packs.get("ConformancePackDetails") } for pack in all_cpack_names: print(f"\t{pack}") cpack_name = input( "Provide ConformancePackName that you want to create a custom " "framework for: " ) if cpack_name not in all_cpack_names: print(f"{cpack_name} is not in the list of conformance packs!") print( "Provide a conformance pack name from the available list of " "conformance packs." ) raise Exception("Invalid conformance pack") print("-" * 88) except ClientError: logger.exception("Couldn't select conformance pack.") raise else: return cpack_name def create_custom_controls(self, cpack_name): """ Create custom controls for all managed AWS Config rules in a conformance pack. :param cpack_name: The name of the conformance pack to create controls for. :return: The list of custom control IDs. """ try: rules_in_pack = self.config_client.describe_conformance_pack_compliance( ConformancePackName=cpack_name ) print( "Number of rules in the conformance pack: ", len(rules_in_pack.get("ConformancePackRuleComplianceList")), ) for rule in rules_in_pack.get("ConformancePackRuleComplianceList"): print(f"\t{rule.get('ConfigRuleName')}") print("-" * 88) print( "Creating a custom control for each rule and a custom framework " "consisting of these rules in Audit Manager." ) am_controls = [] for rule in rules_in_pack.get("ConformancePackRuleComplianceList"): config_rule = self.config_client.describe_config_rules( ConfigRuleNames=[rule.get("ConfigRuleName")] ) source_id = ( config_rule.get("ConfigRules")[0] .get("Source", {}) .get("SourceIdentifier") ) custom_control = self.auditmanager_client.create_control( name="Config-" + rule.get("ConfigRuleName"), controlMappingSources=[ { "sourceName": "ConfigRule", "sourceSetUpOption": "System_Controls_Mapping", "sourceType": "AWS_Config", "sourceKeyword": { "keywordInputType": "SELECT_FROM_LIST", "keywordValue": source_id, }, } ], ).get("control", {}) am_controls.append({"id": custom_control.get("id")}) print("Successfully created a control for each config rule.") print("-" * 88) except ClientError: logger.exception("Failed to create custom controls.") raise else: return am_controls def create_custom_framework(self, cpack_name, am_control_ids): """ Create a custom Audit Manager framework from a selected AWS Config conformance pack. :param cpack_name: The name of the conformance pack to create a framework from. :param am_control_ids: The IDs of the custom controls created from the conformance pack. """ try: print("Creating custom framework...") custom_framework = self.auditmanager_client.create_assessment_framework( name="Config-Conformance-pack-" + cpack_name, controlSets=[{"name": cpack_name, "controls": am_control_ids}], ) print( f"Successfully created the custom framework: ", f"{custom_framework.get('framework').get('name')}: ", f"{custom_framework.get('framework').get('id')}", ) print("-" * 88) except ClientError: logger.exception("Failed to create custom framework.") raise def run_demo(): print("-" * 88) print("Welcome to the AWS Audit Manager custom framework demo!") print("-" * 88) print( "You can use this sample to select a conformance pack from AWS Config and " "use AWS Audit Manager to create a custom control for all the managed " "rules under the conformance pack. A custom framework is also created " "with these controls." ) print("-" * 88) conf_pack = ConformancePack(boto3.client("config"), boto3.client("auditmanager")) cpack_name = conf_pack.get_conformance_pack() am_controls = conf_pack.create_custom_controls(cpack_name) conf_pack.create_custom_framework(cpack_name, am_controls) if __name__ == "__main__": run_demo()
-
有关API详细信息,请参阅 Python (Boto3) API 参考中的AWS SDK以下主题。
-
以下代码示例展示了如何:
获取所有以 Security Hub 作为其数据源的标准控制的列表。
创建包含控制的 Audit Manager 自定义框架。
- SDK适用于 Python (Boto3)
-
注意
还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 import logging import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) class SecurityHub: def __init__(self, auditmanager_client): self.auditmanager_client = auditmanager_client def get_sechub_controls(self): """ Gets the list of controls that use Security Hub as their data source. :return: The list of Security Hub controls. """ print("-" * 88) next_token = None page = 1 sechub_control_list = [] while True: print("Page [" + str(page) + "]") if next_token is None: control_list = self.auditmanager_client.list_controls( controlType="Standard", maxResults=100 ) else: control_list = self.auditmanager_client.list_controls( controlType="Standard", nextToken=next_token, maxResults=100 ) print("Total controls found:", len(control_list.get("controlMetadataList"))) for control in control_list.get("controlMetadataList"): control_details = self.auditmanager_client.get_control( controlId=control.get("id") ).get("control", {}) if "AWS Security Hub" in control_details.get("controlSources"): sechub_control_list.append({"id": control_details.get("id")}) next_token = control_list.get("nextToken") if not next_token: break page += 1 print("Number of Security Hub controls found: ", len(sechub_control_list)) return sechub_control_list def create_custom_framework(self, am_controls): """ Create a custom framework with a list of controls. :param am_controls: The list of controls to include in the framework. """ try: print("Creating custom framework...") custom_framework = self.auditmanager_client.create_assessment_framework( name="All Security Hub Controls Framework", controlSets=[{"name": "Security-Hub", "controls": am_controls}], ) print( f"Successfully created the custom framework: " f"{custom_framework.get('framework').get('name')}: " f"{custom_framework.get('framework').get('id')}" ) print("-" * 88) except ClientError: logger.exception("Failed to create custom framework.") raise def run_demo(): print("-" * 88) print("Welcome to the AWS Audit Manager Security Hub demo!") print("-" * 88) print(" This script creates a custom framework with all Security Hub controls.") print("-" * 88) sechub = SecurityHub(boto3.client("auditmanager")) am_controls = sechub.get_sechub_controls() sechub.create_custom_framework(am_controls) if __name__ == "__main__": run_demo()
-
有关API详细信息,请参阅 Python (Boto3) API 参考中的AWS SDK以下主题。
-
以下代码示例演示了如何创建包含一天证据的 Audit Manager 评估报告。
- SDK适用于 Python (Boto3)
-
注意
还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 import dateutil.parser import logging import time import urllib.request import uuid import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) class AuditReport: def __init__(self, auditmanager_client): self.auditmanager_client = auditmanager_client def get_input(self): print("-" * 40) try: assessment_id = input("Provide assessment id [uuid]: ").lower() try: assessment_uuid = uuid.UUID(assessment_id) except ValueError: logger.error("Assessment Id is not a valid UUID: %s", assessment_id) raise evidence_folder = input("Provide evidence date [yyyy-mm-dd]: ") try: evidence_date = dateutil.parser.parse(evidence_folder).date() except ValueError: logger.error("Invalid date : %s", evidence_folder) raise try: self.auditmanager_client.get_assessment( assessmentId=str(assessment_uuid) ) except ClientError: logger.exception("Couldn't get assessment %s.", assessment_uuid) raise except (ValueError, ClientError): return None, None else: return assessment_uuid, evidence_date def clear_staging(self, assessment_uuid, evidence_date): """ Find all the evidence in the report and clear it. """ next_token = None page = 1 interested_folder_id_list = [] while True: print(f"Page [{page}]") if next_token is None: folder_list = ( self.auditmanager_client.get_evidence_folders_by_assessment( assessmentId=str(assessment_uuid), maxResults=1000 ) ) else: folder_list = ( self.auditmanager_client.get_evidence_folders_by_assessment( assessmentId=str(assessment_uuid), nextToken=next_token, maxResults=1000, ) ) folders = folder_list.get("evidenceFolders") print(f"Got {len(folders)} folders.") for folder in folders: folder_id = folder.get("id") if folder.get("name") == str(evidence_date): interested_folder_id_list.append(folder_id) if folder.get("assessmentReportSelectionCount") == folder.get( "totalEvidence" ): print( f"Removing folder from report selection : {folder.get('name')} " f"{folder_id} {folder.get('controlId')}" ) self.auditmanager_client.disassociate_assessment_report_evidence_folder( assessmentId=str(assessment_uuid), evidenceFolderId=folder_id ) elif folder.get("assessmentReportSelectionCount") > 0: # Get all evidence in the folder and # add selected evidence in the selected_evidence_list. evidence_list = ( self.auditmanager_client.get_evidence_by_evidence_folder( assessmentId=str(assessment_uuid), controlSetId=folder_id, evidenceFolderId=folder_id, maxResults=1000, ) ) selected_evidence_list = [] for evidence in evidence_list.get("evidence"): if evidence.get("assessmentReportSelection") == "Yes": selected_evidence_list.append(evidence.get("id")) print( f"Removing evidence report selection : {folder.get('name')} " f"{len(selected_evidence_list)}" ) self.auditmanager_client.batch_disassociate_assessment_report_evidence( assessmentId=str(assessment_uuid), evidenceFolderId=folder_id, evidenceIds=selected_evidence_list, ) next_token = folder_list.get("nextToken") if not next_token: break page += 1 return interested_folder_id_list def add_folder_to_staging(self, assessment_uuid, folder_id_list): print(f"Adding folders to report : {folder_id_list}") for folder in folder_id_list: self.auditmanager_client.associate_assessment_report_evidence_folder( assessmentId=str(assessment_uuid), evidenceFolderId=folder ) def get_report(self, assessment_uuid): report = self.auditmanager_client.create_assessment_report( name="ReportViaScript", description="testing", assessmentId=str(assessment_uuid), ) if self._is_report_generated(report.get("assessmentReport").get("id")): report_url = self.auditmanager_client.get_assessment_report_url( assessmentReportId=report.get("assessmentReport").get("id"), assessmentId=str(assessment_uuid), ) print(report_url.get("preSignedUrl")) urllib.request.urlretrieve( report_url.get("preSignedUrl").get("link"), report_url.get("preSignedUrl").get("hyperlinkName"), ) print( f"Report saved as {report_url.get('preSignedUrl').get('hyperlinkName')}." ) else: print("Report generation did not finish in 15 minutes.") print( "Failed to download report. Go to the console and manually download " "the report." ) def _is_report_generated(self, assessment_report_id): max_wait_time = 0 while max_wait_time < 900: print(f"Checking status of the report {assessment_report_id}") report_list = self.auditmanager_client.list_assessment_reports(maxResults=1) if ( report_list.get("assessmentReports")[0].get("id") == assessment_report_id and report_list.get("assessmentReports")[0].get("status") == "COMPLETE" ): return True print("Sleeping for 5 seconds...") time.sleep(5) max_wait_time += 5 def run_demo(): print("-" * 88) print("Welcome to the AWS Audit Manager samples demo!") print("-" * 88) print( "This script creates an assessment report for an assessment with all the " "evidence collected on the provided date." ) print("-" * 88) report = AuditReport(boto3.client("auditmanager")) assessment_uuid, evidence_date = report.get_input() if assessment_uuid is not None and evidence_date is not None: folder_id_list = report.clear_staging(assessment_uuid, evidence_date) report.add_folder_to_staging(assessment_uuid, folder_id_list) report.get_report(assessment_uuid) if __name__ == "__main__": run_demo()
-
有关API详细信息,请参阅 Python (Boto3) API 参考中的AWS SDK以下主题。
-