AWS Lambda Funzioni di esempio per AWS Config le regole (Python) - AWS Config

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

AWS Lambda Funzioni di esempio per AWS Config le regole (Python)

AWS Lambda esegue funzioni in risposta a eventi pubblicati dai AWS servizi. La funzione per una regola Lambda AWS Config personalizzata riceve un evento pubblicato da AWS Config e utilizza quindi i dati ricevuti dall'evento e recuperati da AWS Config API per valutare la conformità della regola. Le operazioni in una funzione per una regola di configurazione variano a seconda che si esegua una valutazione attivata da modifiche di configurazione o attivata periodicamente.

Per informazioni sui modelli comuni all'interno AWS Lambda delle funzioni, consulta Programming Model nella AWS Lambda Developer Guide.

Funzione di esempio per le valutazioni attivate dalle modifiche di configurazione

AWS Config invocherà una funzione come nell'esempio seguente quando rileva una modifica alla configurazione per una risorsa che rientra nell'ambito di una regola personalizzata.

Se utilizzi la AWS Config console per creare una regola associata a una funzione come questo esempio, scegli Modifiche alla configurazione come tipo di trigger. Se usi AWS Config API o AWS CLI per creare la regola, imposta l'MessageTypeattributo su ConfigurationItemChangeNotification andOversizedConfigurationItemChangeNotification. Queste impostazioni consentono di attivare la regola ogni volta che viene AWS Config generato un elemento di configurazione o un elemento di configurazione di grandi dimensioni a seguito di una modifica delle risorse.

import botocore import boto3 import json import datetime # Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). ASSUME_ROLE_MODE = False # This gets the client after assuming the Config service role # either in the same AWS account or cross-account. def get_client(service, event): """Return the service boto client. It should be used instead of directly calling the client. Keyword arguments: service -- the service name used for calling the boto.client() event -- the event variable given in the lambda handler """ if not ASSUME_ROLE_MODE: return boto3.client(service) credentials = get_assume_role_credentials(event["executionRoleArn"]) return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) # Helper function used to validate input def check_defined(reference, reference_name): if not reference: raise Exception('Error: ', reference_name, 'is not defined') return reference # Check whether the message is OversizedConfigurationItemChangeNotification or not def is_oversized_changed_notification(message_type): check_defined(message_type, 'messageType') return message_type == 'OversizedConfigurationItemChangeNotification' # Get configurationItem using getResourceConfigHistory API # in case of OversizedConfigurationItemChangeNotification def get_configuration(resource_type, resource_id, configuration_capture_time): result = AWS_CONFIG_CLIENT.get_resource_config_history( resourceType=resource_type, resourceId=resource_id, laterTime=configuration_capture_time, limit=1) configurationItem = result['configurationItems'][0] return convert_api_configuration(configurationItem) # Convert from the API model to the original invocation model def convert_api_configuration(configurationItem): for k, v in configurationItem.items(): if isinstance(v, datetime.datetime): configurationItem[k] = str(v) configurationItem['awsAccountId'] = configurationItem['accountId'] configurationItem['ARN'] = configurationItem['arn'] configurationItem['configurationStateMd5Hash'] = configurationItem['configurationItemMD5Hash'] configurationItem['configurationItemVersion'] = configurationItem['version'] configurationItem['configuration'] = json.loads(configurationItem['configuration']) if 'relationships' in configurationItem: for i in range(len(configurationItem['relationships'])): configurationItem['relationships'][i]['name'] = configurationItem['relationships'][i]['relationshipName'] return configurationItem # Based on the type of message get the configuration item # either from configurationItem in the invoking event # or using the getResourceConfigHistory API in getConfiguration function. def get_configuration_item(invokingEvent): check_defined(invokingEvent, 'invokingEvent') if is_oversized_changed_notification(invokingEvent['messageType']): configurationItemSummary = check_defined(invokingEvent['configurationItemSummary'], 'configurationItemSummary') return get_configuration(configurationItemSummary['resourceType'], configurationItemSummary['resourceId'], configurationItemSummary['configurationItemCaptureTime']) return check_defined(invokingEvent['configurationItem'], 'configurationItem') # Check whether the resource has been deleted. If it has, then the evaluation is unnecessary. def is_applicable(configurationItem, event): try: check_defined(configurationItem, 'configurationItem') check_defined(event, 'event') except: return True status = configurationItem['configurationItemStatus'] eventLeftScope = event['eventLeftScope'] if status == 'ResourceDeleted': print("Resource Deleted, setting Compliance Status to NOT_APPLICABLE.") return (status == 'OK' or status == 'ResourceDiscovered') and not eventLeftScope def get_assume_role_credentials(role_arn): sts_client = boto3.client('sts') try: assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution") return assume_role_response['Credentials'] except botocore.exceptions.ClientError as ex: # Scrub error message for any internal account info leaks if 'AccessDenied' in ex.response['Error']['Code']: ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." else: ex.response['Error']['Message'] = "InternalError" ex.response['Error']['Code'] = "InternalError" raise ex def evaluate_change_notification_compliance(configuration_item, rule_parameters): check_defined(configuration_item, 'configuration_item') check_defined(configuration_item['configuration'], 'configuration_item[\'configuration\']') if rule_parameters: check_defined(rule_parameters, 'rule_parameters') if (configuration_item['resourceType'] != 'AWS::EC2::Instance'): return 'NOT_APPLICABLE' elif rule_parameters.get('desiredInstanceType'): if (configuration_item['configuration']['instanceType'] in rule_parameters['desiredInstanceType']): return 'COMPLIANT' return 'NON_COMPLIANT' def lambda_handler(event, context): global AWS_CONFIG_CLIENT check_defined(event, 'event') invoking_event = json.loads(event['invokingEvent']) rule_parameters = {} if 'ruleParameters' in event: rule_parameters = json.loads(event['ruleParameters']) compliance_value = 'NOT_APPLICABLE' AWS_CONFIG_CLIENT = get_client('config', event) configuration_item = get_configuration_item(invoking_event) if is_applicable(configuration_item, event): compliance_value = evaluate_change_notification_compliance( configuration_item, rule_parameters) response = AWS_CONFIG_CLIENT.put_evaluations( Evaluations=[ { 'ComplianceResourceType': invoking_event['configurationItem']['resourceType'], 'ComplianceResourceId': invoking_event['configurationItem']['resourceId'], 'ComplianceType': compliance_value, 'OrderingTimestamp': invoking_event['configurationItem']['configurationItemCaptureTime'] }, ], ResultToken=event['resultToken'])
Operazioni della funzione

La funzione esegue le operazioni seguenti in fase di runtime:

  1. La funzione viene eseguita quando AWS Lambda passa l'eventoggetto alla handler funzione. In questo esempio, la funzione accetta il callback parametro opzionale, che utilizza per restituire informazioni al chiamante. AWS Lambda passa anche un context oggetto, che contiene informazioni e metodi che la funzione può utilizzare durante l'esecuzione. Tieni presente che nelle versioni più recenti di Lambda il contesto non è più utilizzato.

  2. La funzione verifica che il messageType per l'evento sia un elemento di configurazione o un elemento di configurazione di dimensioni eccessive e quindi restituisce l'elemento di configurazione.

  3. Il gestore chiama la funzione isApplicable per determinare se la risorsa sia stato eliminata.

    Nota

    Le regole che segnalano le risorse eliminate devono restituire il risultato della valutazione di NOT_APPLICABLE per evitare valutazioni non necessarie delle regole.

  4. Il gestore chiama la evaluateChangeNotificationCompliance funzione e passa gli ruleParameters oggetti configurationItem and AWS Config pubblicati nell'evento.

    La funzione valuta innanzitutto se la risorsa è un'EC2istanza. Se la risorsa non è un'EC2istanza, la funzione restituisce un valore di conformità diNOT_APPLICABLE.

    La funzione valuta quindi se l'attributo instanceType nell'elemento di configurazione sia pari al valore del parametro desiredInstanceType. Se i valori sono uguali, la funzione restituisce COMPLIANT. Se i valori non sono uguali, la funzione restituisce NON_COMPLIANT.

  5. Il gestore si prepara a inviare i risultati della valutazione AWS Config inizializzando l'oggetto. putEvaluationsRequest Questo oggetto include il parametro Evaluations, che identifica il risultato di conformità, il tipo di risorsa e l'ID della risorsa che è stata valutata. L'putEvaluationsRequestoggetto include anche il token dei risultati dell'evento, che identifica la regola e l'evento per. AWS Config

  6. Il gestore invia i risultati della valutazione AWS Config passando l'oggetto al putEvaluations metodo del config client.

Funzione di esempio per valutazioni periodiche

AWS Config invocherà una funzione come nell'esempio seguente per le valutazioni periodiche. Le valutazioni periodiche si verificano con la frequenza che specifichi quando definisci la regola in AWS Config.

Se usi la AWS Config console per creare una regola associata a una funzione come questo esempio, scegli Periodic come tipo di trigger. Se usi AWS Config API o AWS CLI per creare la regola, imposta l'MessageTypeattributo ScheduledNotification su.

import botocore import boto3 import json import datetime # Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). ASSUME_ROLE_MODE = False DEFAULT_RESOURCE_TYPE = 'AWS::::Account' # This gets the client after assuming the Config service role # either in the same AWS account or cross-account. def get_client(service, event): """Return the service boto client. It should be used instead of directly calling the client. Keyword arguments: service -- the service name used for calling the boto.client() event -- the event variable given in the lambda handler """ if not ASSUME_ROLE_MODE: return boto3.client(service) credentials = get_assume_role_credentials(event["executionRoleArn"]) return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) def get_assume_role_credentials(role_arn): sts_client = boto3.client('sts') try: assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution") return assume_role_response['Credentials'] except botocore.exceptions.ClientError as ex: # Scrub error message for any internal account info leaks if 'AccessDenied' in ex.response['Error']['Code']: ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." else: ex.response['Error']['Message'] = "InternalError" ex.response['Error']['Code'] = "InternalError" raise ex # Check whether the message is a ScheduledNotification or not. def is_scheduled_notification(message_type): return message_type == 'ScheduledNotification' def count_resource_types(applicable_resource_type, next_token, count): resource_identifier = AWS_CONFIG_CLIENT.list_discovered_resources(resourceType=applicable_resource_type, nextToken=next_token) updated = count + len(resource_identifier['resourceIdentifiers']); return updated # Evaluates the configuration items in the snapshot and returns the compliance value to the handler. def evaluate_compliance(max_count, actual_count): return 'NON_COMPLIANT' if int(actual_count) > int(max_count) else 'COMPLIANT' def evaluate_parameters(rule_parameters): if 'applicableResourceType' not in rule_parameters: raise ValueError('The parameter with "applicableResourceType" as key must be defined.') if not rule_parameters['applicableResourceType']: raise ValueError('The parameter "applicableResourceType" must have a defined value.') return rule_parameters # This generate an evaluation for config def build_evaluation(resource_id, compliance_type, event, resource_type=DEFAULT_RESOURCE_TYPE, annotation=None): """Form an evaluation as a dictionary. Usually suited to report on scheduled rules. Keyword arguments: resource_id -- the unique id of the resource to report compliance_type -- either COMPLIANT, NON_COMPLIANT or NOT_APPLICABLE event -- the event variable given in the lambda handler resource_type -- the CloudFormation resource type (or AWS::::Account) to report on the rule (default DEFAULT_RESOURCE_TYPE) annotation -- an annotation to be added to the evaluation (default None) """ eval_cc = {} if annotation: eval_cc['Annotation'] = annotation eval_cc['ComplianceResourceType'] = resource_type eval_cc['ComplianceResourceId'] = resource_id eval_cc['ComplianceType'] = compliance_type eval_cc['OrderingTimestamp'] = str(json.loads(event['invokingEvent'])['notificationCreationTime']) return eval_cc def lambda_handler(event, context): global AWS_CONFIG_CLIENT evaluations = [] rule_parameters = {} resource_count = 0 max_count = 0 invoking_event = json.loads(event['invokingEvent']) if 'ruleParameters' in event: rule_parameters = json.loads(event['ruleParameters']) valid_rule_parameters = evaluate_parameters(rule_parameters) compliance_value = 'NOT_APPLICABLE' AWS_CONFIG_CLIENT = get_client('config', event) if is_scheduled_notification(invoking_event['messageType']): result_resource_count = count_resource_types(valid_rule_parameters['applicableResourceType'], '', resource_count) if valid_rule_parameters.get('maxCount'): max_count = valid_rule_parameters['maxCount'] compliance_value = evaluate_compliance(max_count, result_resource_count) evaluations.append(build_evaluation(event['accountId'], compliance_value, event, resource_type=DEFAULT_RESOURCE_TYPE)) response = AWS_CONFIG_CLIENT.put_evaluations(Evaluations=evaluations, ResultToken=event['resultToken'])
Operazioni della funzione

La funzione esegue le operazioni seguenti in fase di runtime:

  1. La funzione viene eseguita quando AWS Lambda passa l'eventoggetto alla handler funzione. In questo esempio, la funzione accetta il callback parametro opzionale, che utilizza per restituire informazioni al chiamante. AWS Lambda passa anche un context oggetto, che contiene informazioni e metodi che la funzione può utilizzare durante l'esecuzione. Tieni presente che nelle versioni più recenti di Lambda il contesto non è più utilizzato.

  2. Per contare le risorse del tipo specificato, il gestore chiama la funzione countResourceTypes e trasferisce il parametro applicableResourceType che ha ricevuto dall'evento. La funzione countResourceTypes chiama il metodo listDiscoveredResources del client config, che restituisce un elenco di identificatori per le risorse applicabili. La funzione utilizza la lunghezza di questo elenco per determinare il numero di risorse applicabili e restituisce questo conteggio al gestore.

  3. Il gestore si prepara a inviare i risultati della valutazione AWS Config inizializzando l'oggetto. putEvaluationsRequest Questo oggetto include il Evaluations parametro, che identifica il risultato di conformità e Account AWS quello che è stato pubblicato nell'evento. Puoi utilizzare il parametro Evaluations per applicare il risultato a qualsiasi tipo di risorsa supportata da AWS Config. L'putEvaluationsRequestoggetto include anche il token dei risultati dell'evento, che identifica la regola e l'evento per. AWS Config

  4. All'interno dell'oggetto putEvaluationsRequest, il gestore chiama la funzione evaluateCompliance, Questa funzione verifica se il numero di risorse applicabili supera il massimo assegnato al parametro maxCount, che è stato fornito dall'evento. Se il numero di risorse supera il limite massimo, la funzione restituisce NON_COMPLIANT. Se il numero di risorse non supera il limite massimo, la funzione restituisce COMPLIANT.

  5. Il gestore invia i risultati della valutazione AWS Config passando l'oggetto al putEvaluations metodo del config client.