本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
此程序示範如何使用 AWS Lambda 接聽 Amazon EventBridge、使用 Amazon Simple Notification Service (SNS) 建立通知,以及將調查結果發佈至 AWS Security Hub,為管理員和安全團隊提供可見性。
設定 Lambda 函數和IAM角色
-
首先設定 AWS Identity and Access Management (IAM) 角色,並定義 Lambda 函數所需的許可。此安全性最佳實務可讓您彈性地指定誰擁有呼叫函數的授權,以及限制授與該使用者的許可。不建議直接在使用者帳戶下執行大多數 AWS 操作,尤其是在管理員帳戶下。
在 開啟IAM主控台https://console.aws.amazon.com/iam/
。 -
使用JSON政策編輯器來建立下方範本中定義的政策。提供您自己的區域和 AWS 帳戶詳細資訊。如需詳細資訊,請參閱在 JSON 索引標籤上建立政策。
{ "Version":"2012-10-17", "Statement":[ { "Sid":"LambdaCertificateExpiryPolicy1", "Effect":"Allow", "Action":"logs:CreateLogGroup", "Resource":"arn:aws:logs:
<region>
:<AWS-ACCT-NUMBER>
:*" }, { "Sid":"LambdaCertificateExpiryPolicy2", "Effect":"Allow", "Action":[ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource":[ "arn:aws:logs:<region>
:<AWS-ACCT-NUMBER>
:log-group:/aws/lambda/handle-expiring-certificates:*" ] }, { "Sid":"LambdaCertificateExpiryPolicy3", "Effect":"Allow", "Action":[ "acm:DescribeCertificate", "acm:GetCertificate", "acm:ListCertificates", "acm:ListTagsForCertificate" ], "Resource":"*" }, { "Sid":"LambdaCertificateExpiryPolicy4", "Effect":"Allow", "Action":"SNS:Publish", "Resource":"*" }, { "Sid":"LambdaCertificateExpiryPolicy5", "Effect":"Allow", "Action":[ "SecurityHub:BatchImportFindings", "SecurityHub:BatchUpdateFindings", "SecurityHub:DescribeHub" ], "Resource":"*" }, { "Sid":"LambdaCertificateExpiryPolicy6", "Effect":"Allow", "Action":"cloudwatch:ListMetrics", "Resource":"*" } ] } -
建立角色並將新政策連接至該IAM角色。如需有關建立IAM角色和連接政策的資訊,請參閱建立 AWS 服務的角色 (主控台)。
在 開啟 AWS Lambda 主控台https://console.aws.amazon.com/lambda/
。 -
建立 Lambda 函數。如需詳細資訊,請參閱使用主控台建立 Lambda 函數。請完成下列步驟:
-
在 Create function (建立函數) 頁面上,選擇 Author from scratch (從頭開始撰寫) 選項來建立函數。
-
在函數名稱欄位中指定名稱,例如 "handle-expiring-certificates"。
-
在 Runtime (執行時間) 清單中選擇 Python 3.8。
-
展開 Change default execution role (變更預設執行角色),然後選擇 se an existing role (使用現有角色)。
-
從 Existing role (現有角色) 清單中選擇您稍早建立的角色。
-
選擇建立函數。
-
在 Function code (函數程式碼) 底下插入以下程式碼:
# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT-0 # # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. import json import boto3 import os from datetime import datetime, timedelta, timezone # ------------------------------------------- # setup global data # ------------------------------------------- utc = timezone.utc # make today timezone aware today = datetime.now().replace(tzinfo=utc) # set up time window for alert - default to 45 if its missing if os.environ.get('EXPIRY_DAYS') is None: expiry_days = 45 else: expiry_days = int(os.environ['EXPIRY_DAYS']) expiry_window = today + timedelta(days = expiry_days) def lambda_handler(event, context): # if this is coming from the ACM event, its for a single certificate if (event['detail-type'] == "ACM Certificate Approaching Expiration"): response = handle_single_cert(event, context.invoked_function_arn) return { 'statusCode': 200, 'body': response } def handle_single_cert(event, context_arn): cert_client = boto3.client('acm') cert_details = cert_client.describe_certificate(CertificateArn=event['resources'][0]) result = 'The following certificate is expiring within ' + str(expiry_days) + ' days: ' + cert_details['Certificate']['DomainName'] # check the expiry window before logging to Security Hub and sending an SNS if cert_details['Certificate']['NotAfter'] < expiry_window: # This call is the text going into the SNS notification result = result + ' (' + cert_details['Certificate']['CertificateArn'] + ') ' # this call is publishing to SH result = result + ' - ' + log_finding_to_sh(event, cert_details, context_arn) # if there's an SNS topic, publish a notification to it if os.environ.get('SNS_TOPIC_ARN') is None: response = result else: sns_client = boto3.client('sns') response = sns_client.publish(TopicArn=os.environ['SNS_TOPIC_ARN'], Message=result, Subject='Certificate Expiration Notification') return result def log_finding_to_sh(event, cert_details, context_arn): # setup for security hub sh_region = get_sh_region(event['region']) sh_hub_arn = "arn:aws:securityhub:{0}:{1}:hub/default".format(sh_region, event['account']) sh_product_arn = "arn:aws:securityhub:{0}:{1}:product/{1}/default".format(sh_region, event['account']) # check if security hub is enabled, and if the hub arn exists sh_client = boto3.client('securityhub', region_name = sh_region) try: sh_enabled = sh_client.describe_hub(HubArn = sh_hub_arn) # the previous command throws an error indicating the hub doesn't exist or lambda doesn't have rights to it so we'll stop attempting to use it except Exception as error: sh_enabled = None print ('Default Security Hub product doesn\'t exist') response = 'Security Hub disabled' # This is used to generate the URL to the cert in the Security Hub Findings to link directly to it cert_id = right(cert_details['Certificate']['CertificateArn'], 36) if sh_enabled: # set up a new findings list new_findings = [] # add expiring certificate to the new findings list new_findings.append({ "SchemaVersion": "2018-10-08", "Id": cert_id, "ProductArn": sh_product_arn, "GeneratorId": context_arn, "AwsAccountId": event['account'], "Types": [ "Software and Configuration Checks/AWS Config Analysis" ], "CreatedAt": event['time'], "UpdatedAt": event['time'], "Severity": { "Original": '89.0', "Label": 'HIGH' }, "Title": 'Certificate expiration', "Description": 'cert expiry', 'Remediation': { 'Recommendation': { 'Text': 'A new certificate for ' + cert_details['Certificate']['DomainName'] + ' should be imported to replace the existing imported certificate before expiration', 'Url': "https://console.aws.amazon.com/acm/home?region=" + event['region'] + "#/?id=" + cert_id } }, 'Resources': [ { 'Id': event['id'], 'Type': 'ACM Certificate', 'Partition': 'aws', 'Region': event['region'] } ], 'Compliance': {'Status': 'WARNING'} }) # push any new findings to security hub if new_findings: try: response = sh_client.batch_import_findings(Findings=new_findings) if response['FailedCount'] > 0: print("Failed to import {} findings".format(response['FailedCount'])) except Exception as error: print("Error: ", error) raise return json.dumps(response) # function to setup the sh region def get_sh_region(event_region): # security hub findings may need to go to a different region so set that here if os.environ.get('SECURITY_HUB_REGION') is None: sh_region_local = event_region else: sh_region_local = os.environ['SECURITY_HUB_REGION'] return sh_region_local # quick function to trim off right side of a string def right(value, count): # To get right part of string, use negative first index in slice. return value[-count:]
-
在 Environment variables (環境變數) 底下,選擇 Edit (編輯) 並選擇性新增以下變數。
-
(選用) EXPIRY_DAYS
指定傳送憑證過期通知的前置時間 (以天為單位)。此函數預設值為 45 天,但您可以指定自訂值。
-
(選用) SNS_TOPIC_ARN
指定 Amazon ARN的 SNS。ARN 以 arn:aws:sns 格式提供完整:
<region>
:<account-number>
:<topic-name>
. -
(選用) SECURITY_HUB_REGION
AWS Security Hub 指定不同區域中的 。如果沒有指定,便會使用執行中 Lambda 函數使用的區域。如果函數在多個區域中執行,則可能需要將所有憑證訊息移至單一區域中的 Security Hub。
-
-
在 Basic settings (基本設定) 下,將 Timeout (逾時) 設為 30 秒。
-
請在頁面頂端選擇 Deploy (部署)。
-
完成下列程序中的任務,以開始使用此解決方案。
自動執行電子郵件過期通知程序
在此範例中,我們會在透過 Amazon 提出事件時,為每個即將到期的憑證提供單一電子郵件 EventBridge。依預設, 每天會針對過期後 45 天或更短的憑證ACM提出事件。(此期間可以使用 ACM PutAccountConfiguration的操作自訂API。) 這些事件都會觸發下列串聯的自動化動作:
ACM raises Amazon EventBridge event →
>>>>>>> events
Event matches Amazon EventBridge rule →
Rule calls Lambda function →
Function sends SNS email and logs a Finding in Security Hub
-
建立 Lambda 函數並設定許可。(已完成 - 請參閱「設定 Lambda 函數和IAM角色」)。
-
為 Lambda 函數建立標準SNS主題,以用於傳送通知。如需詳細資訊,請參閱建立 Amazon SNS主題。
-
訂閱新SNS主題的任何利益相關方。如需詳細資訊,請參閱訂閱 Amazon SNS主題。
-
建立 Amazon EventBridge 規則以觸發 Lambda 函數。如需詳細資訊,請參閱建立對事件 做出反應的 Amazon EventBridge 規則。
在 Amazon EventBridge 主控台的 中https://console.aws.amazon.com/events/
,導覽至事件 > 規則頁面,然後選擇建立規則 。指定 Service Name (服務名稱)、Event Type (事件類型)以及 Lambda function (Lambda 函數)。在 Event Pattern preview (事件模式預覽) 編輯器中,貼上以下程式碼: { "source": [ "aws.acm" ], "detail-type": [ "ACM Certificate Approaching Expiration" ] }
事件 (例如 Lambda 接收的事件) 會顯示在 Show sample event(s) (顯示範例事件) 底下:
{ "version": "0", "id": "9c95e8e4-96a4-ef3f-b739-b6aa5b193afb", "detail-type": "ACM Certificate Approaching Expiration", "source": "aws.acm", "account": "123456789012", "time": "2020-09-30T06:51:08Z", "region": "us-east-1", "resources": [ "arn:aws:acm:us-east-1:123456789012:certificate/61f50cd4-45b9-4259-b049-d0a53682fa4b" ], "detail": { "DaysToExpiry": 31, "CommonName": "My Awesome Service" } }
清理方式
一旦您不再需要範例組態或任何組態,最佳實務是移除該組態的所有軌跡,避免安全問題和未來的非預期費用:
-
IAM 政策和角色
-
Lambda 函數
-
CloudWatch 事件規則
-
CloudWatch 與 Lambda 相關聯的日誌
-
SNS 主題