Exemplos de funções do AWS Lambda para regras do AWS Config (Python) - AWS Config

Exemplos de funções do AWS Lambda para regras do AWS Config (Python)

O AWS Lambda executa funções em resposta a eventos publicados por serviços da AWS. A função para uma regra de lambda personalizada do AWS Config recebe um evento que é publicado pelo AWS Config. Em seguida, a função usa dados recebidos do evento, que foram recuperados da API do AWS Config, para avaliar a conformidade da regra. As operações em uma função para uma regra Config diferem conforme a avaliação seja acionada por alterações de configuração ou acionada periodicamente.

Para obter mais informações sobre padrões comuns nas funções do AWS Lambda, consulte Modelo de programação no Guia do desenvolvedor doAWS Lambda.

Exemplo de função para avaliações acionadas por alterações de configuração

O AWS Config chamará uma função como a do exemplo a seguir, quando detectar a alteração de configuração de um recurso dentro do escopo de uma regra personalizada.

Se você usar o console do AWS Config para criar uma regra que esteja associada a uma função, como neste exemplo, escolha Configuration changes (Alterações da configuração) como o tipo de trigger. Se você usar a API do AWS Config ou a AWS CLI para criar a regra, defina o atributo MessageType para ConfigurationItemChangeNotification e OversizedConfigurationItemChangeNotification. Essas configurações habilitam sua regra para ser acionada sempre que o AWS Config gerar um item de configuração ou um item de configuração superdimensionado como resultado de uma mudança de recurso.

import botocore import boto3 import json import datetime # Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). ASSUME_ROLE_MODE = False # This gets the client after assuming the Config service role # either in the same AWS account or cross-account. def get_client(service, event): """Return the service boto client. It should be used instead of directly calling the client. Keyword arguments: service -- the service name used for calling the boto.client() event -- the event variable given in the lambda handler """ if not ASSUME_ROLE_MODE: return boto3.client(service) credentials = get_assume_role_credentials(event["executionRoleArn"]) return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) # Helper function used to validate input def check_defined(reference, reference_name): if not reference: raise Exception('Error: ', reference_name, 'is not defined') return reference # Check whether the message is OversizedConfigurationItemChangeNotification or not def is_oversized_changed_notification(message_type): check_defined(message_type, 'messageType') return message_type == 'OversizedConfigurationItemChangeNotification' # Get configurationItem using getResourceConfigHistory API # in case of OversizedConfigurationItemChangeNotification def get_configuration(resource_type, resource_id, configuration_capture_time): result = AWS_CONFIG_CLIENT.get_resource_config_history( resourceType=resource_type, resourceId=resource_id, laterTime=configuration_capture_time, limit=1) configurationItem = result['configurationItems'][0] return convert_api_configuration(configurationItem) # Convert from the API model to the original invocation model def convert_api_configuration(configurationItem): for k, v in configurationItem.items(): if isinstance(v, datetime.datetime): configurationItem[k] = str(v) configurationItem['awsAccountId'] = configurationItem['accountId'] configurationItem['ARN'] = configurationItem['arn'] configurationItem['configurationStateMd5Hash'] = configurationItem['configurationItemMD5Hash'] configurationItem['configurationItemVersion'] = configurationItem['version'] configurationItem['configuration'] = json.loads(configurationItem['configuration']) if 'relationships' in configurationItem: for i in range(len(configurationItem['relationships'])): configurationItem['relationships'][i]['name'] = configurationItem['relationships'][i]['relationshipName'] return configurationItem # Based on the type of message get the configuration item # either from configurationItem in the invoking event # or using the getResourceConfigHistory API in getConfiguration function. def get_configuration_item(invokingEvent): check_defined(invokingEvent, 'invokingEvent') if is_oversized_changed_notification(invokingEvent['messageType']): configurationItemSummary = check_defined(invokingEvent['configurationItemSummary'], 'configurationItemSummary') return get_configuration(configurationItemSummary['resourceType'], configurationItemSummary['resourceId'], configurationItemSummary['configurationItemCaptureTime']) return check_defined(invokingEvent['configurationItem'], 'configurationItem') # Check whether the resource has been deleted. If it has, then the evaluation is unnecessary. def is_applicable(configurationItem, event): try: check_defined(configurationItem, 'configurationItem') check_defined(event, 'event') except: return True status = configurationItem['configurationItemStatus'] eventLeftScope = event['eventLeftScope'] if status == 'ResourceDeleted': print("Resource Deleted, setting Compliance Status to NOT_APPLICABLE.") return (status == 'OK' or status == 'ResourceDiscovered') and not eventLeftScope def get_assume_role_credentials(role_arn): sts_client = boto3.client('sts') try: assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution") return assume_role_response['Credentials'] except botocore.exceptions.ClientError as ex: # Scrub error message for any internal account info leaks if 'AccessDenied' in ex.response['Error']['Code']: ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." else: ex.response['Error']['Message'] = "InternalError" ex.response['Error']['Code'] = "InternalError" raise ex def evaluate_change_notification_compliance(configuration_item, rule_parameters): check_defined(configuration_item, 'configuration_item') check_defined(configuration_item['configuration'], 'configuration_item[\'configuration\']') if rule_parameters: check_defined(rule_parameters, 'rule_parameters') if (configuration_item['resourceType'] != 'AWS::EC2::Instance'): return 'NOT_APPLICABLE' elif rule_parameters.get('desiredInstanceType'): if (configuration_item['configuration']['instanceType'] in rule_parameters['desiredInstanceType']): return 'COMPLIANT' return 'NON_COMPLIANT' def lambda_handler(event, context): global AWS_CONFIG_CLIENT check_defined(event, 'event') invoking_event = json.loads(event['invokingEvent']) rule_parameters = {} if 'ruleParameters' in event: rule_parameters = json.loads(event['ruleParameters']) compliance_value = 'NOT_APPLICABLE' AWS_CONFIG_CLIENT = get_client('config', event) configuration_item = get_configuration_item(invoking_event) if is_applicable(configuration_item, event): compliance_value = evaluate_change_notification_compliance( configuration_item, rule_parameters) response = AWS_CONFIG_CLIENT.put_evaluations( Evaluations=[ { 'ComplianceResourceType': invoking_event['configurationItem']['resourceType'], 'ComplianceResourceId': invoking_event['configurationItem']['resourceId'], 'ComplianceType': compliance_value, 'OrderingTimestamp': invoking_event['configurationItem']['configurationItemCaptureTime'] }, ], ResultToken=event['resultToken'])
Operações de função

A função executa as seguintes operações em tempo de execução:

  1. A função é executada quando o AWS Lambda passa o objeto event para a função handler. Neste exemplo, a função aceita o parâmetro callback opcional, que ela usa para retornar informações para o chamador. O AWS Lambda também informa um objeto de context, que contém informações e métodos que a função pode usar enquanto é executada. Observe que nas versões mais recentes do Lambda, o contexto não é mais usado.

  2. A função verifica se messageType para o evento é um item de configuração ou um item de configuração superdimensionado, e, em seguida, retorna o item de configuração.

  3. O handler chama a função isApplicable para determinar se o recurso foi excluído.

    nota

    Os relatórios de regras sobre recursos excluídos devem retornar o resultado da avaliação de NOT_APPLICABLE, a fim de evitar avaliações desnecessárias de regras.

  4. O manipulador chama a função evaluateChangeNotificationCompliance e transmite os objetos configurationItem e ruleParameters e que o AWS Config publicou no evento.

    A função primeiro avalia se o recurso é uma instância EC2. Se o recurso não é uma instância EC2, a função retorna um valor de compatibilidade de NOT_APPLICABLE.

    Em seguida, a função avalia se o atributo instanceType no item de configuração é igual ao valor de parâmetro desiredInstanceType. Se os valores são iguais, a função retornará COMPLIANT. Se os valores não são iguais, a função retornará NON_COMPLIANT.

  5. O manipulador prepara o envio dos resultados da avaliação para o AWS Config, inicializando o objeto putEvaluationsRequest. Este objeto inclui o parâmetro Evaluations, que identifica o resultado de compatibilidade, o tipo de recurso e o ID do recurso que foi avaliado. O objeto putEvaluationsRequest também inclui o token de resultado do evento, que identifica a regra e o evento para o AWS Config.

  6. O manipulador envia os resultados da avaliação para o AWS Config, transmitindo o objeto para o método putEvaluations do cliente config.

Exemplo de função para avaliações periódicas

O AWS Config chamará uma função como a do exemplo a seguir para avaliações periódicas. As avaliações periódicas ocorrem com a frequência que você especifica ao definir a regra em AWS Config.

Se você usar o console do AWS Config para criar uma regra que esteja associada a uma função, como neste exemplo, escolha Periodic (Periódico) como o tipo de trigger. Se você usar a API do AWS Config ou a AWS CLI para criar a regra, defina o atributo MessageType como ScheduledNotification.

import botocore import boto3 import json import datetime # Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). ASSUME_ROLE_MODE = False DEFAULT_RESOURCE_TYPE = 'AWS::::Account' # This gets the client after assuming the Config service role # either in the same AWS account or cross-account. def get_client(service, event): """Return the service boto client. It should be used instead of directly calling the client. Keyword arguments: service -- the service name used for calling the boto.client() event -- the event variable given in the lambda handler """ if not ASSUME_ROLE_MODE: return boto3.client(service) credentials = get_assume_role_credentials(event["executionRoleArn"]) return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) def get_assume_role_credentials(role_arn): sts_client = boto3.client('sts') try: assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution") return assume_role_response['Credentials'] except botocore.exceptions.ClientError as ex: # Scrub error message for any internal account info leaks if 'AccessDenied' in ex.response['Error']['Code']: ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." else: ex.response['Error']['Message'] = "InternalError" ex.response['Error']['Code'] = "InternalError" raise ex # Check whether the message is a ScheduledNotification or not. def is_scheduled_notification(message_type): return message_type == 'ScheduledNotification' def count_resource_types(applicable_resource_type, next_token, count): resource_identifier = AWS_CONFIG_CLIENT.list_discovered_resources(resourceType=applicable_resource_type, nextToken=next_token) updated = count + len(resource_identifier['resourceIdentifiers']); return updated # Evaluates the configuration items in the snapshot and returns the compliance value to the handler. def evaluate_compliance(max_count, actual_count): return 'NON_COMPLIANT' if int(actual_count) > int(max_count) else 'COMPLIANT' def evaluate_parameters(rule_parameters): if 'applicableResourceType' not in rule_parameters: raise ValueError('The parameter with "applicableResourceType" as key must be defined.') if not rule_parameters['applicableResourceType']: raise ValueError('The parameter "applicableResourceType" must have a defined value.') return rule_parameters # This generate an evaluation for config def build_evaluation(resource_id, compliance_type, event, resource_type=DEFAULT_RESOURCE_TYPE, annotation=None): """Form an evaluation as a dictionary. Usually suited to report on scheduled rules. Keyword arguments: resource_id -- the unique id of the resource to report compliance_type -- either COMPLIANT, NON_COMPLIANT or NOT_APPLICABLE event -- the event variable given in the lambda handler resource_type -- the CloudFormation resource type (or AWS::::Account) to report on the rule (default DEFAULT_RESOURCE_TYPE) annotation -- an annotation to be added to the evaluation (default None) """ eval_cc = {} if annotation: eval_cc['Annotation'] = annotation eval_cc['ComplianceResourceType'] = resource_type eval_cc['ComplianceResourceId'] = resource_id eval_cc['ComplianceType'] = compliance_type eval_cc['OrderingTimestamp'] = str(json.loads(event['invokingEvent'])['notificationCreationTime']) return eval_cc def lambda_handler(event, context): global AWS_CONFIG_CLIENT evaluations = [] rule_parameters = {} resource_count = 0 max_count = 0 invoking_event = json.loads(event['invokingEvent']) if 'ruleParameters' in event: rule_parameters = json.loads(event['ruleParameters']) valid_rule_parameters = evaluate_parameters(rule_parameters) compliance_value = 'NOT_APPLICABLE' AWS_CONFIG_CLIENT = get_client('config', event) if is_scheduled_notification(invoking_event['messageType']): result_resource_count = count_resource_types(valid_rule_parameters['applicableResourceType'], '', resource_count) if valid_rule_parameters.get('maxCount'): max_count = valid_rule_parameters['maxCount'] compliance_value = evaluate_compliance(max_count, result_resource_count) evaluations.append(build_evaluation(event['accountId'], compliance_value, event, resource_type=DEFAULT_RESOURCE_TYPE)) response = AWS_CONFIG_CLIENT.put_evaluations(Evaluations=evaluations, ResultToken=event['resultToken'])
Operações de função

A função executa as seguintes operações em tempo de execução:

  1. A função é executada quando o AWS Lambda passa o objeto event para a função handler. Neste exemplo, a função aceita o parâmetro callback opcional, que ela usa para retornar informações para o chamador. O AWS Lambda também informa um objeto de context, que contém informações e métodos que a função pode usar enquanto é executada. Observe que nas versões mais recentes do Lambda, o contexto não é mais usado.

  2. Para contar os recursos do tipo especificado, o handler chama a função countResourceTypes e passa o parâmetro applicableResourceType que recebeu do evento. A função countResourceTypes chama o método listDiscoveredResources do cliente config, que retorna uma lista de identificadores para os recursos aplicáveis. A função usa o comprimento da lista para determinar o número de recursos aplicáveis e retorna essa contagem para o handler.

  3. O manipulador prepara o envio dos resultados da avaliação para o AWS Config, inicializando o objeto putEvaluationsRequest. Este objeto inclui o parâmetro Evaluations, que identifica o resultado da conformidade e a Conta da AWS que foi publicada no evento. Você pode usar o parâmetro Evaluations para aplicar o resultado a qualquer tipo de recurso que tenha suporte no AWS Config. O objeto putEvaluationsRequest também inclui o token de resultado do evento, que identifica a regra e o evento para o AWS Config.

  4. Dentro do objeto putEvaluationsRequest, o handler chama a função evaluateCompliance. Esta função testa se o número de recursos aplicáveis excede o máximo atribuído ao parâmetro maxCount, que foi fornecido pelo evento. Se o número de recursos excede o máximo, a função retorna NON_COMPLIANT. Se o número de recursos não excede o máximo, a função retorna COMPLIANT.

  5. O manipulador envia os resultados da avaliação para o AWS Config, transmitindo o objeto para o método putEvaluations do cliente config.