Exemplos de funções do AWS Lambda para regras do AWS Config (Python)
O AWS Lambda executa funções em resposta a eventos publicados por serviços da AWS. A função para uma regra de lambda personalizada do AWS Config recebe um evento que é publicado pelo AWS Config. Em seguida, a função usa dados recebidos do evento, que foram recuperados da API do AWS Config, para avaliar a conformidade da regra. As operações em uma função para uma regra Config diferem conforme a avaliação seja acionada por alterações de configuração ou acionada periodicamente.
Para obter mais informações sobre padrões comuns nas funções do AWS Lambda, consulte Modelo de programação no Guia do desenvolvedor doAWS Lambda.
Sumário
Exemplo de função para avaliações acionadas por alterações de configuração
O AWS Config chamará uma função como a do exemplo a seguir, quando detectar a alteração de configuração de um recurso dentro do escopo de uma regra personalizada.
Se você usar o console do AWS Config para criar uma regra que esteja associada a uma função, como neste exemplo, escolha Configuration changes (Alterações da configuração) como o tipo de trigger. Se você usar a API do AWS Config ou a AWS CLI para criar a regra, defina o atributo MessageType
para ConfigurationItemChangeNotification
e OversizedConfigurationItemChangeNotification
. Essas configurações habilitam sua regra para ser acionada sempre que o AWS Config gerar um item de configuração ou um item de configuração superdimensionado como resultado de uma mudança de recurso.
import botocore import boto3 import json import datetime # Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). ASSUME_ROLE_MODE = False # This gets the client after assuming the Config service role # either in the same AWS account or cross-account. def get_client(service, event): """Return the service boto client. It should be used instead of directly calling the client. Keyword arguments: service -- the service name used for calling the boto.client() event -- the event variable given in the lambda handler """ if not ASSUME_ROLE_MODE: return boto3.client(service) credentials = get_assume_role_credentials(event["executionRoleArn"]) return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) # Helper function used to validate input def check_defined(reference, reference_name): if not reference: raise Exception('Error: ', reference_name, 'is not defined') return reference # Check whether the message is OversizedConfigurationItemChangeNotification or not def is_oversized_changed_notification(message_type): check_defined(message_type, 'messageType') return message_type == 'OversizedConfigurationItemChangeNotification' # Get configurationItem using getResourceConfigHistory API # in case of OversizedConfigurationItemChangeNotification def get_configuration(resource_type, resource_id, configuration_capture_time): result = AWS_CONFIG_CLIENT.get_resource_config_history( resourceType=resource_type, resourceId=resource_id, laterTime=configuration_capture_time, limit=1) configurationItem = result['configurationItems'][0] return convert_api_configuration(configurationItem) # Convert from the API model to the original invocation model def convert_api_configuration(configurationItem): for k, v in configurationItem.items(): if isinstance(v, datetime.datetime): configurationItem[k] = str(v) configurationItem['awsAccountId'] = configurationItem['accountId'] configurationItem['ARN'] = configurationItem['arn'] configurationItem['configurationStateMd5Hash'] = configurationItem['configurationItemMD5Hash'] configurationItem['configurationItemVersion'] = configurationItem['version'] configurationItem['configuration'] = json.loads(configurationItem['configuration']) if 'relationships' in configurationItem: for i in range(len(configurationItem['relationships'])): configurationItem['relationships'][i]['name'] = configurationItem['relationships'][i]['relationshipName'] return configurationItem # Based on the type of message get the configuration item # either from configurationItem in the invoking event # or using the getResourceConfigHistory API in getConfiguration function. def get_configuration_item(invokingEvent): check_defined(invokingEvent, 'invokingEvent') if is_oversized_changed_notification(invokingEvent['messageType']): configurationItemSummary = check_defined(invokingEvent['configurationItemSummary'], 'configurationItemSummary') return get_configuration(configurationItemSummary['resourceType'], configurationItemSummary['resourceId'], configurationItemSummary['configurationItemCaptureTime']) return check_defined(invokingEvent['configurationItem'], 'configurationItem') # Check whether the resource has been deleted. If it has, then the evaluation is unnecessary. def is_applicable(configurationItem, event): try: check_defined(configurationItem, 'configurationItem') check_defined(event, 'event') except: return True status = configurationItem['configurationItemStatus'] eventLeftScope = event['eventLeftScope'] if status == 'ResourceDeleted': print("Resource Deleted, setting Compliance Status to NOT_APPLICABLE.") return (status == 'OK' or status == 'ResourceDiscovered') and not eventLeftScope def get_assume_role_credentials(role_arn): sts_client = boto3.client('sts') try: assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution") return assume_role_response['Credentials'] except botocore.exceptions.ClientError as ex: # Scrub error message for any internal account info leaks if 'AccessDenied' in ex.response['Error']['Code']: ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." else: ex.response['Error']['Message'] = "InternalError" ex.response['Error']['Code'] = "InternalError" raise ex def evaluate_change_notification_compliance(configuration_item, rule_parameters): check_defined(configuration_item, 'configuration_item') check_defined(configuration_item['configuration'], 'configuration_item[\'configuration\']') if rule_parameters: check_defined(rule_parameters, 'rule_parameters') if (configuration_item['resourceType'] != 'AWS::EC2::Instance'): return 'NOT_APPLICABLE' elif rule_parameters.get('desiredInstanceType'): if (configuration_item['configuration']['instanceType'] in rule_parameters['desiredInstanceType']): return 'COMPLIANT' return 'NON_COMPLIANT' def lambda_handler(event, context): global AWS_CONFIG_CLIENT check_defined(event, 'event') invoking_event = json.loads(event['invokingEvent']) rule_parameters = {} if 'ruleParameters' in event: rule_parameters = json.loads(event['ruleParameters']) compliance_value = 'NOT_APPLICABLE' AWS_CONFIG_CLIENT = get_client('config', event) configuration_item = get_configuration_item(invoking_event) if is_applicable(configuration_item, event): compliance_value = evaluate_change_notification_compliance( configuration_item, rule_parameters) response = AWS_CONFIG_CLIENT.put_evaluations( Evaluations=[ { 'ComplianceResourceType': invoking_event['configurationItem']['resourceType'], 'ComplianceResourceId': invoking_event['configurationItem']['resourceId'], 'ComplianceType': compliance_value, 'OrderingTimestamp': invoking_event['configurationItem']['configurationItemCaptureTime'] }, ], ResultToken=event['resultToken'])
Operações de função
A função executa as seguintes operações em tempo de execução:
-
A função é executada quando o AWS Lambda passa o objeto
event
para a funçãohandler
. Neste exemplo, a função aceita o parâmetrocallback
opcional, que ela usa para retornar informações para o chamador. O AWS Lambda também informa um objeto decontext
, que contém informações e métodos que a função pode usar enquanto é executada. Observe que nas versões mais recentes do Lambda, o contexto não é mais usado. -
A função verifica se
messageType
para o evento é um item de configuração ou um item de configuração superdimensionado, e, em seguida, retorna o item de configuração. -
O handler chama a função
isApplicable
para determinar se o recurso foi excluído.nota
Os relatórios de regras sobre recursos excluídos devem retornar o resultado da avaliação de
NOT_APPLICABLE
, a fim de evitar avaliações desnecessárias de regras. -
O manipulador chama a função
evaluateChangeNotificationCompliance
e transmite os objetosconfigurationItem
eruleParameters
e que o AWS Config publicou no evento.A função primeiro avalia se o recurso é uma instância EC2. Se o recurso não é uma instância EC2, a função retorna um valor de compatibilidade de
NOT_APPLICABLE
.Em seguida, a função avalia se o atributo
instanceType
no item de configuração é igual ao valor de parâmetrodesiredInstanceType
. Se os valores são iguais, a função retornaráCOMPLIANT
. Se os valores não são iguais, a função retornaráNON_COMPLIANT
. -
O manipulador prepara o envio dos resultados da avaliação para o AWS Config, inicializando o objeto
putEvaluationsRequest
. Este objeto inclui o parâmetroEvaluations
, que identifica o resultado de compatibilidade, o tipo de recurso e o ID do recurso que foi avaliado. O objetoputEvaluationsRequest
também inclui o token de resultado do evento, que identifica a regra e o evento para o AWS Config. -
O manipulador envia os resultados da avaliação para o AWS Config, transmitindo o objeto para o método
putEvaluations
do clienteconfig
.
Exemplo de função para avaliações periódicas
O AWS Config chamará uma função como a do exemplo a seguir para avaliações periódicas. As avaliações periódicas ocorrem com a frequência que você especifica ao definir a regra em AWS Config.
Se você usar o console do AWS Config para criar uma regra que esteja associada a uma função, como neste exemplo, escolha Periodic (Periódico) como o tipo de trigger. Se você usar a API do AWS Config ou a AWS CLI para criar a regra, defina o atributo MessageType
como ScheduledNotification
.
import botocore import boto3 import json import datetime # Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). ASSUME_ROLE_MODE = False DEFAULT_RESOURCE_TYPE = 'AWS::::Account' # This gets the client after assuming the Config service role # either in the same AWS account or cross-account. def get_client(service, event): """Return the service boto client. It should be used instead of directly calling the client. Keyword arguments: service -- the service name used for calling the boto.client() event -- the event variable given in the lambda handler """ if not ASSUME_ROLE_MODE: return boto3.client(service) credentials = get_assume_role_credentials(event["executionRoleArn"]) return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) def get_assume_role_credentials(role_arn): sts_client = boto3.client('sts') try: assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution") return assume_role_response['Credentials'] except botocore.exceptions.ClientError as ex: # Scrub error message for any internal account info leaks if 'AccessDenied' in ex.response['Error']['Code']: ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." else: ex.response['Error']['Message'] = "InternalError" ex.response['Error']['Code'] = "InternalError" raise ex # Check whether the message is a ScheduledNotification or not. def is_scheduled_notification(message_type): return message_type == 'ScheduledNotification' def count_resource_types(applicable_resource_type, next_token, count): resource_identifier = AWS_CONFIG_CLIENT.list_discovered_resources(resourceType=applicable_resource_type, nextToken=next_token) updated = count + len(resource_identifier['resourceIdentifiers']); return updated # Evaluates the configuration items in the snapshot and returns the compliance value to the handler. def evaluate_compliance(max_count, actual_count): return 'NON_COMPLIANT' if int(actual_count) > int(max_count) else 'COMPLIANT' def evaluate_parameters(rule_parameters): if 'applicableResourceType' not in rule_parameters: raise ValueError('The parameter with "applicableResourceType" as key must be defined.') if not rule_parameters['applicableResourceType']: raise ValueError('The parameter "applicableResourceType" must have a defined value.') return rule_parameters # This generate an evaluation for config def build_evaluation(resource_id, compliance_type, event, resource_type=DEFAULT_RESOURCE_TYPE, annotation=None): """Form an evaluation as a dictionary. Usually suited to report on scheduled rules. Keyword arguments: resource_id -- the unique id of the resource to report compliance_type -- either COMPLIANT, NON_COMPLIANT or NOT_APPLICABLE event -- the event variable given in the lambda handler resource_type -- the CloudFormation resource type (or AWS::::Account) to report on the rule (default DEFAULT_RESOURCE_TYPE) annotation -- an annotation to be added to the evaluation (default None) """ eval_cc = {} if annotation: eval_cc['Annotation'] = annotation eval_cc['ComplianceResourceType'] = resource_type eval_cc['ComplianceResourceId'] = resource_id eval_cc['ComplianceType'] = compliance_type eval_cc['OrderingTimestamp'] = str(json.loads(event['invokingEvent'])['notificationCreationTime']) return eval_cc def lambda_handler(event, context): global AWS_CONFIG_CLIENT evaluations = [] rule_parameters = {} resource_count = 0 max_count = 0 invoking_event = json.loads(event['invokingEvent']) if 'ruleParameters' in event: rule_parameters = json.loads(event['ruleParameters']) valid_rule_parameters = evaluate_parameters(rule_parameters) compliance_value = 'NOT_APPLICABLE' AWS_CONFIG_CLIENT = get_client('config', event) if is_scheduled_notification(invoking_event['messageType']): result_resource_count = count_resource_types(valid_rule_parameters['applicableResourceType'], '', resource_count) if valid_rule_parameters.get('maxCount'): max_count = valid_rule_parameters['maxCount'] compliance_value = evaluate_compliance(max_count, result_resource_count) evaluations.append(build_evaluation(event['accountId'], compliance_value, event, resource_type=DEFAULT_RESOURCE_TYPE)) response = AWS_CONFIG_CLIENT.put_evaluations(Evaluations=evaluations, ResultToken=event['resultToken'])
Operações de função
A função executa as seguintes operações em tempo de execução:
-
A função é executada quando o AWS Lambda passa o objeto
event
para a funçãohandler
. Neste exemplo, a função aceita o parâmetrocallback
opcional, que ela usa para retornar informações para o chamador. O AWS Lambda também informa um objeto decontext
, que contém informações e métodos que a função pode usar enquanto é executada. Observe que nas versões mais recentes do Lambda, o contexto não é mais usado. -
Para contar os recursos do tipo especificado, o handler chama a função
countResourceTypes
e passa o parâmetroapplicableResourceType
que recebeu do evento. A funçãocountResourceTypes
chama o métodolistDiscoveredResources
do clienteconfig
, que retorna uma lista de identificadores para os recursos aplicáveis. A função usa o comprimento da lista para determinar o número de recursos aplicáveis e retorna essa contagem para o handler. -
O manipulador prepara o envio dos resultados da avaliação para o AWS Config, inicializando o objeto
putEvaluationsRequest
. Este objeto inclui o parâmetroEvaluations
, que identifica o resultado da conformidade e a Conta da AWS que foi publicada no evento. Você pode usar o parâmetroEvaluations
para aplicar o resultado a qualquer tipo de recurso que tenha suporte no AWS Config. O objetoputEvaluationsRequest
também inclui o token de resultado do evento, que identifica a regra e o evento para o AWS Config. -
Dentro do objeto
putEvaluationsRequest
, o handler chama a funçãoevaluateCompliance
. Esta função testa se o número de recursos aplicáveis excede o máximo atribuído ao parâmetromaxCount
, que foi fornecido pelo evento. Se o número de recursos excede o máximo, a função retornaNON_COMPLIANT
. Se o número de recursos não excede o máximo, a função retornaCOMPLIANT
. -
O manipulador envia os resultados da avaliação para o AWS Config, transmitindo o objeto para o método
putEvaluations
do clienteconfig
.