本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Lambda 函数响应事件
此过程演示 AWS Lambda 如何使用在 Amazon 上监听 EventBridge、使用亚马逊简单通知服务 (SNS) 创建通知以及如何向其发布调查结果 AWS Security Hub,从而为管理员和安全团队提供可见性。
设置 Lambda 函数和角色 IAM
-
首先配置 AWS Identity and Access Management (IAM) 角色并定义 Lambda 函数所需的权限。通过此安全性最佳实践,您可以灵活地指定谁有权调用该函数,并限制授予该用户的权限。不建议直接在用户帐户下运行大多数 AWS 操作,尤其不要在管理员帐户下运行。
打开IAM控制台,网址为https://console.aws.amazon.com/iam/
。 -
使用JSON策略编辑器创建在以下模板中定义的策略。提供您自己的地区和 AWS 账户详情。有关更多信息,请参阅JSON选项卡上的创建策略。
{ "Version":"2012-10-17", "Statement":[ { "Sid":"LambdaCertificateExpiryPolicy1", "Effect":"Allow", "Action":"logs:CreateLogGroup", "Resource":"arn:aws:logs:
<region>
:<AWS-ACCT-NUMBER>
:*" }, { "Sid":"LambdaCertificateExpiryPolicy2", "Effect":"Allow", "Action":[ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource":[ "arn:aws:logs:<region>
:<AWS-ACCT-NUMBER>
:log-group:/aws/lambda/handle-expiring-certificates:*" ] }, { "Sid":"LambdaCertificateExpiryPolicy3", "Effect":"Allow", "Action":[ "acm:DescribeCertificate", "acm:GetCertificate", "acm:ListCertificates", "acm:ListTagsForCertificate" ], "Resource":"*" }, { "Sid":"LambdaCertificateExpiryPolicy4", "Effect":"Allow", "Action":"SNS:Publish", "Resource":"*" }, { "Sid":"LambdaCertificateExpiryPolicy5", "Effect":"Allow", "Action":[ "SecurityHub:BatchImportFindings", "SecurityHub:BatchUpdateFindings", "SecurityHub:DescribeHub" ], "Resource":"*" }, { "Sid":"LambdaCertificateExpiryPolicy6", "Effect":"Allow", "Action":"cloudwatch:ListMetrics", "Resource":"*" } ] } -
创建IAM角色并将新策略附加到该角色。有关创建IAM角色和附加策略的信息,请参阅为 AWS 服务创建角色(控制台)。
打开 AWS Lambda 控制台,网址为https://console.aws.amazon.com/lambda/
。 -
创建 Lambda 函数。有关更多信息,请参阅使用控制台创建 Lambda 函数。完成以下步骤:
-
在创建函数页面上,选择 Author from scratch(从头开始创作)选项以创建函数。
-
在函数名称字段中指定一个名称,例如 handle-expiring-certificates “”。
-
在 Runtime(运行时)列表中,选择“Python 3.8”。
-
展开 Change default execution role(更改默认执行角色),然后选择 Use an existing role(使用现有角色)。
-
从 Existing role(现有角色)列表中选择您先前创建的角色。
-
选择 Create function (创建函数)。
-
在 Function code(函数代码)下,插入以下代码。
# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT-0 # # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. import json import boto3 import os from datetime import datetime, timedelta, timezone # ------------------------------------------- # setup global data # ------------------------------------------- utc = timezone.utc # make today timezone aware today = datetime.now().replace(tzinfo=utc) # set up time window for alert - default to 45 if its missing if os.environ.get('EXPIRY_DAYS') is None: expiry_days = 45 else: expiry_days = int(os.environ['EXPIRY_DAYS']) expiry_window = today + timedelta(days = expiry_days) def lambda_handler(event, context): # if this is coming from the ACM event, its for a single certificate if (event['detail-type'] == "ACM Certificate Approaching Expiration"): response = handle_single_cert(event, context.invoked_function_arn) return { 'statusCode': 200, 'body': response } def handle_single_cert(event, context_arn): cert_client = boto3.client('acm') cert_details = cert_client.describe_certificate(CertificateArn=event['resources'][0]) result = 'The following certificate is expiring within ' + str(expiry_days) + ' days: ' + cert_details['Certificate']['DomainName'] # check the expiry window before logging to Security Hub and sending an SNS if cert_details['Certificate']['NotAfter'] < expiry_window: # This call is the text going into the SNS notification result = result + ' (' + cert_details['Certificate']['CertificateArn'] + ') ' # this call is publishing to SH result = result + ' - ' + log_finding_to_sh(event, cert_details, context_arn) # if there's an SNS topic, publish a notification to it if os.environ.get('SNS_TOPIC_ARN') is None: response = result else: sns_client = boto3.client('sns') response = sns_client.publish(TopicArn=os.environ['SNS_TOPIC_ARN'], Message=result, Subject='Certificate Expiration Notification') return result def log_finding_to_sh(event, cert_details, context_arn): # setup for security hub sh_region = get_sh_region(event['region']) sh_hub_arn = "arn:aws:securityhub:{0}:{1}:hub/default".format(sh_region, event['account']) sh_product_arn = "arn:aws:securityhub:{0}:{1}:product/{1}/default".format(sh_region, event['account']) # check if security hub is enabled, and if the hub arn exists sh_client = boto3.client('securityhub', region_name = sh_region) try: sh_enabled = sh_client.describe_hub(HubArn = sh_hub_arn) # the previous command throws an error indicating the hub doesn't exist or lambda doesn't have rights to it so we'll stop attempting to use it except Exception as error: sh_enabled = None print ('Default Security Hub product doesn\'t exist') response = 'Security Hub disabled' # This is used to generate the URL to the cert in the Security Hub Findings to link directly to it cert_id = right(cert_details['Certificate']['CertificateArn'], 36) if sh_enabled: # set up a new findings list new_findings = [] # add expiring certificate to the new findings list new_findings.append({ "SchemaVersion": "2018-10-08", "Id": cert_id, "ProductArn": sh_product_arn, "GeneratorId": context_arn, "AwsAccountId": event['account'], "Types": [ "Software and Configuration Checks/AWS Config Analysis" ], "CreatedAt": event['time'], "UpdatedAt": event['time'], "Severity": { "Original": '89.0', "Label": 'HIGH' }, "Title": 'Certificate expiration', "Description": 'cert expiry', 'Remediation': { 'Recommendation': { 'Text': 'A new certificate for ' + cert_details['Certificate']['DomainName'] + ' should be imported to replace the existing imported certificate before expiration', 'Url': "https://console.aws.amazon.com/acm/home?region=" + event['region'] + "#/?id=" + cert_id } }, 'Resources': [ { 'Id': event['id'], 'Type': 'ACM Certificate', 'Partition': 'aws', 'Region': event['region'] } ], 'Compliance': {'Status': 'WARNING'} }) # push any new findings to security hub if new_findings: try: response = sh_client.batch_import_findings(Findings=new_findings) if response['FailedCount'] > 0: print("Failed to import {} findings".format(response['FailedCount'])) except Exception as error: print("Error: ", error) raise return json.dumps(response) # function to setup the sh region def get_sh_region(event_region): # security hub findings may need to go to a different region so set that here if os.environ.get('SECURITY_HUB_REGION') is None: sh_region_local = event_region else: sh_region_local = os.environ['SECURITY_HUB_REGION'] return sh_region_local # quick function to trim off right side of a string def right(value, count): # To get right part of string, use negative first index in slice. return value[-count:]
-
在 Environment variables(环境变量)下,选择 Edit(编辑)并可选择添加以下变量。
-
(可选)EXPIRY_ DAYS
指定发送证书过期通知之前的准备时间(天数)。此函数默认为 45 天,但您可以指定自定义值。
-
(可选)SNS_ TOPIC _ ARN
为 Amazon 指定一个SNS。ARN以 arn: aw ARN s: sns 的格式提供完整内容:
<region>
:<account-number>
:<topic-name>
. -
(可选)SECURITY_ HUB _ REGION
在不同的区域 AWS Security Hub 中指定。如果未指定此选项,则使用正在运行的 Lambda 函数的区域。如果该函数在多个区域中运行,则可能需要将所有证书消息都转到单个区域中的 Security Hub。
-
-
在 Basic settings(基本设置)下,将 Timeout(超时)设置为 30 秒。
-
在页面顶部,选择 Deploy(部署)。
-
完成以下过程中的任务以开始使用此解决方案。
自动发送过期电子邮件通知
在此示例中,当通过 Amazon EventBridge 发起活动时,我们为每份即将到期的证书提供一封电子邮件。默认情况下,对于距离到期日不超过 45 天的证书,每天都会ACM引发一个事件。(此时间段可以使用. 的PutAccountConfiguration操作进行自定义ACMAPI。) 这些事件中的每一个都会触发以下级联的自动操作:
ACM raises Amazon EventBridge event → >>>>>>> events Event matches Amazon EventBridge rule → Rule calls Lambda function → Function sends SNS email and logs a Finding in Security Hub
-
创建 Lambda 函数并配置权限。(已完成 – 请参阅 设置 Lambda 函数和角色 IAM)。
-
为 Lambda 函数创建一个用于发送通知的标准SNS主题。有关更多信息,请参阅创建 Amazon SNS 主题。
-
为任何感兴趣的各方订阅新SNS主题。有关更多信息,请参阅订阅 Amazon SNS 主题。
-
创建用于触发 Lambda 函数的亚马逊 EventBridge 规则。有关更多信息,请参阅创建对事件做出反应的 Amazon EventBridge 规则。
在 Amazon EventBridge 控制台中 https://console.aws.amazon.com/events/
,导航至事件 > 规则页面,然后选择创建规则。指定服务名称、事件类型和 Lambda 函数。在事件模式预览编辑器中,粘贴以下代码: { "source": [ "aws.acm" ], "detail-type": [ "ACM Certificate Approaching Expiration" ] }
显示示例事件下会显示如 Lambda 接收这样的事件:
{ "version": "0", "id": "9c95e8e4-96a4-ef3f-b739-b6aa5b193afb", "detail-type": "ACM Certificate Approaching Expiration", "source": "aws.acm", "account": "123456789012", "time": "2020-09-30T06:51:08Z", "region": "us-east-1", "resources": [ "arn:aws:acm:us-east-1:123456789012:certificate/61f50cd4-45b9-4259-b049-d0a53682fa4b" ], "detail": { "DaysToExpiry": 31, "CommonName": "My Awesome Service" } }
清理
一旦您不再需要示例配置或任何配置,最佳实践是删除该配置的所有痕迹,以避免安全问题和以后的意外费用:
-
IAM策略和角色
-
Lambda 函数
-
CloudWatch 活动规则
-
CloudWatch 与 Lambda 关联的日志
-
SNS话题