CloudWatch Ejemplos de registros que se utilizan SDK para Python (Boto3) - AWS SDKEjemplos de código

Hay más AWS SDK ejemplos disponibles en el GitHub repositorio de AWS Doc SDK Examples.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

CloudWatch Ejemplos de registros que se utilizan SDK para Python (Boto3)

Los siguientes ejemplos de código muestran cómo realizar acciones e implementar escenarios comunes mediante el uso AWS SDK for Python (Boto3) de CloudWatch registros.

Las acciones son extractos de código de programas más grandes y deben ejecutarse en contexto. Mientras las acciones muestran cómo llamar a las funciones de servicio individuales, es posible ver las acciones en contexto en los escenarios relacionados.

Los escenarios son ejemplos de código que muestran cómo llevar a cabo una tarea específica a través de llamadas a varias funciones dentro del servicio o combinado con otros Servicios de AWS.

Cada ejemplo incluye un enlace al código fuente completo, donde puede encontrar instrucciones sobre cómo configurar y ejecutar el código en su contexto.

Acciones

En el siguiente ejemplo de código se muestra cómo usar GetQueryResults.

SDKpara Python (Boto3)
nota

Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS.

def _wait_for_query_results(self, client, query_id): """ Waits for the query to complete and retrieves the results. :param query_id: The ID of the initiated query. :type query_id: str :return: A list containing the results of the query. :rtype: list """ while True: time.sleep(1) results = client.get_query_results(queryId=query_id) if results["status"] in [ "Complete", "Failed", "Cancelled", "Timeout", "Unknown", ]: return results.get("results", [])
  • Para API obtener más información, consulte GetQueryResultsla AWS SDKreferencia de Python (Boto3). API

En el siguiente ejemplo de código se muestra cómo usar StartLiveTail.

SDKpara Python (Boto3)

Incluir los archivos requeridos.

import boto3 import time from datetime import datetime

Inicie la sesión de Live Tail.

# Initialize the client client = boto3.client('logs') start_time = time.time() try: response = client.start_live_tail( logGroupIdentifiers=log_group_identifiers, logStreamNames=log_streams, logEventFilterPattern=filter_pattern ) event_stream = response['responseStream'] # Handle the events streamed back in the response for event in event_stream: # Set a timeout to close the stream. # This will end the Live Tail session. if (time.time() - start_time >= 10): event_stream.close() break # Handle when session is started if 'sessionStart' in event: session_start_event = event['sessionStart'] print(session_start_event) # Handle when log event is given in a session update elif 'sessionUpdate' in event: log_events = event['sessionUpdate']['sessionResults'] for log_event in log_events: print('[{date}] {log}'.format(date=datetime.fromtimestamp(log_event['timestamp']/1000),log=log_event['message'])) else: # On-stream exceptions are captured here raise RuntimeError(str(event)) except Exception as e: print(e)
  • Para API obtener más información, consulte StartLiveTailla AWS SDKreferencia de Python (Boto3). API

En el siguiente ejemplo de código se muestra cómo usar StartQuery.

SDKpara Python (Boto3)
nota

Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS.

def perform_query(self, date_range): """ Performs the actual CloudWatch log query. :param date_range: A tuple representing the start and end datetime for the query. :type date_range: tuple :return: A list containing the query results. :rtype: list """ client = boto3.client("logs") try: try: start_time = round( self.date_utilities.convert_iso8601_to_unix_timestamp(date_range[0]) ) end_time = round( self.date_utilities.convert_iso8601_to_unix_timestamp(date_range[1]) ) response = client.start_query( logGroupName=self.log_groups, startTime=start_time, endTime=end_time, queryString="fields @timestamp, @message | sort @timestamp asc", limit=self.limit, ) query_id = response["queryId"] except client.exceptions.ResourceNotFoundException as e: raise DateOutOfBoundsError(f"Resource not found: {e}") while True: time.sleep(1) results = client.get_query_results(queryId=query_id) if results["status"] in [ "Complete", "Failed", "Cancelled", "Timeout", "Unknown", ]: return results.get("results", []) except DateOutOfBoundsError: return [] def _initiate_query(self, client, date_range, max_logs): """ Initiates the CloudWatch logs query. :param date_range: A tuple representing the start and end datetime for the query. :type date_range: tuple :param max_logs: The maximum number of logs to retrieve. :type max_logs: int :return: The query ID as a string. :rtype: str """ try: start_time = round( self.date_utilities.convert_iso8601_to_unix_timestamp(date_range[0]) ) end_time = round( self.date_utilities.convert_iso8601_to_unix_timestamp(date_range[1]) ) response = client.start_query( logGroupName=self.log_groups, startTime=start_time, endTime=end_time, queryString="fields @timestamp, @message | sort @timestamp asc", limit=max_logs, ) return response["queryId"] except client.exceptions.ResourceNotFoundException as e: raise DateOutOfBoundsError(f"Resource not found: {e}")
  • Para API obtener más información, consulte StartQueryla AWS SDKreferencia de Python (Boto3). API

Escenarios

El siguiente ejemplo de código muestra cómo usar los CloudWatch registros para consultar más de 10 000 registros.

SDKpara Python (Boto3)
nota

Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS.

Este archivo invoca un módulo de ejemplo para gestionar CloudWatch consultas que superen los 10 000 resultados.

import logging import os import sys import boto3 from botocore.config import Config from cloudwatch_query import CloudWatchQuery from date_utilities import DateUtilities # Configure logging at the module level. logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s", ) class CloudWatchLogsQueryRunner: def __init__(self): """ Initializes the CloudWatchLogsQueryRunner class by setting up date utilities and creating a CloudWatch Logs client with retry configuration. """ self.date_utilities = DateUtilities() self.cloudwatch_logs_client = self.create_cloudwatch_logs_client() def create_cloudwatch_logs_client(self): """ Creates and returns a CloudWatch Logs client with a specified retry configuration. :return: A CloudWatch Logs client instance. :rtype: boto3.client """ try: return boto3.client("logs", config=Config(retries={"max_attempts": 10})) except Exception as e: logging.error(f"Failed to create CloudWatch Logs client: {e}") sys.exit(1) def fetch_environment_variables(self): """ Fetches and validates required environment variables for query start and end dates. :return: Tuple of query start date and end date as integers. :rtype: tuple :raises SystemExit: If required environment variables are missing or invalid. """ try: query_start_date = int(os.environ["QUERY_START_DATE"]) query_end_date = int(os.environ["QUERY_END_DATE"]) except KeyError: logging.error( "Both QUERY_START_DATE and QUERY_END_DATE environment variables are required." ) sys.exit(1) except ValueError as e: logging.error(f"Error parsing date environment variables: {e}") sys.exit(1) return query_start_date, query_end_date def convert_dates_to_iso8601(self, start_date, end_date): """ Converts UNIX timestamp dates to ISO 8601 format using DateUtilities. :param start_date: The start date in UNIX timestamp. :type start_date: int :param end_date: The end date in UNIX timestamp. :type end_date: int :return: Start and end dates in ISO 8601 format. :rtype: tuple """ start_date_iso8601 = self.date_utilities.convert_unix_timestamp_to_iso8601( start_date ) end_date_iso8601 = self.date_utilities.convert_unix_timestamp_to_iso8601( end_date ) return start_date_iso8601, end_date_iso8601 def execute_query( self, start_date_iso8601, end_date_iso8601, log_group="/workflows/cloudwatch-logs/large-query", ): """ Creates a CloudWatchQuery instance and executes the query with provided date range. :param start_date_iso8601: The start date in ISO 8601 format. :type start_date_iso8601: str :param end_date_iso8601: The end date in ISO 8601 format. :type end_date_iso8601: str :param log_group: Log group to search: "/workflows/cloudwatch-logs/large-query" :type log_group: str """ cloudwatch_query = CloudWatchQuery( [start_date_iso8601, end_date_iso8601], ) cloudwatch_query.query_logs((start_date_iso8601, end_date_iso8601)) logging.info("Query executed successfully.") logging.info( f"Queries completed in {cloudwatch_query.query_duration} seconds. Total logs found: {len(cloudwatch_query.query_results)}" ) def main(): """ Main function to start a recursive CloudWatch logs query. Fetches required environment variables, converts dates, and executes the query. """ logging.info("Starting a recursive CloudWatch logs query...") runner = CloudWatchLogsQueryRunner() query_start_date, query_end_date = runner.fetch_environment_variables() start_date_iso8601 = DateUtilities.convert_unix_timestamp_to_iso8601( query_start_date ) end_date_iso8601 = DateUtilities.convert_unix_timestamp_to_iso8601(query_end_date) runner.execute_query(start_date_iso8601, end_date_iso8601) if __name__ == "__main__": main()

Este módulo procesa CloudWatch las consultas que superan los 10 000 resultados.

import logging import time from datetime import datetime import threading import boto3 from date_utilities import DateUtilities class DateOutOfBoundsError(Exception): """Exception raised when the date range for a query is out of bounds.""" pass class CloudWatchQuery: """ A class to query AWS CloudWatch logs within a specified date range. :ivar date_range: Start and end datetime for the query. :vartype date_range: tuple :ivar limit: Maximum number of log entries to return. :vartype limit: int """ def __init__(self, date_range): self.lock = threading.Lock() self.log_groups = "/workflows/cloudwatch-logs/large-query" self.query_results = [] self.date_range = date_range self.query_duration = None self.datetime_format = "%Y-%m-%d %H:%M:%S.%f" self.date_utilities = DateUtilities() self.limit = 10000 def query_logs(self, date_range): """ Executes a CloudWatch logs query for a specified date range and calculates the execution time of the query. :return: A batch of logs retrieved from the CloudWatch logs query. :rtype: list """ start_time = datetime.now() start_date, end_date = self.date_utilities.normalize_date_range_format( date_range, from_format="unix_timestamp", to_format="datetime" ) logging.info( f"Original query:" f"\n START: {start_date}" f"\n END: {end_date}" ) self.recursive_query((start_date, end_date)) end_time = datetime.now() self.query_duration = (end_time - start_time).total_seconds() def recursive_query(self, date_range): """ Processes logs within a given date range, fetching batches of logs recursively if necessary. :param date_range: The date range to fetch logs for, specified as a tuple (start_timestamp, end_timestamp). :type date_range: tuple :return: None if the recursive fetching is continued or stops when the final batch of logs is processed. Although it doesn't explicitly return the query results, this method accumulates all fetched logs in the `self.query_results` attribute. :rtype: None """ batch_of_logs = self.perform_query(date_range) # Add the batch to the accumulated logs with self.lock: self.query_results.extend(batch_of_logs) if len(batch_of_logs) == self.limit: logging.info(f"Fetched {self.limit}, checking for more...") most_recent_log = self.find_most_recent_log(batch_of_logs) most_recent_log_timestamp = next( item["value"] for item in most_recent_log if item["field"] == "@timestamp" ) new_range = (most_recent_log_timestamp, date_range[1]) midpoint = self.date_utilities.find_middle_time(new_range) first_half_thread = threading.Thread( target=self.recursive_query, args=((most_recent_log_timestamp, midpoint),), ) second_half_thread = threading.Thread( target=self.recursive_query, args=((midpoint, date_range[1]),) ) first_half_thread.start() second_half_thread.start() first_half_thread.join() second_half_thread.join() def find_most_recent_log(self, logs): """ Search a list of log items and return most recent log entry. :param logs: A list of logs to analyze. :return: log :type :return List containing log item details """ most_recent_log = None most_recent_date = "1970-01-01 00:00:00.000" for log in logs: for item in log: if item["field"] == "@timestamp": logging.debug(f"Compared: {item['value']} to {most_recent_date}") if ( self.date_utilities.compare_dates( item["value"], most_recent_date ) == item["value"] ): logging.debug(f"New most recent: {item['value']}") most_recent_date = item["value"] most_recent_log = log logging.info(f"Most recent log date of batch: {most_recent_date}") return most_recent_log def perform_query(self, date_range): """ Performs the actual CloudWatch log query. :param date_range: A tuple representing the start and end datetime for the query. :type date_range: tuple :return: A list containing the query results. :rtype: list """ client = boto3.client("logs") try: try: start_time = round( self.date_utilities.convert_iso8601_to_unix_timestamp(date_range[0]) ) end_time = round( self.date_utilities.convert_iso8601_to_unix_timestamp(date_range[1]) ) response = client.start_query( logGroupName=self.log_groups, startTime=start_time, endTime=end_time, queryString="fields @timestamp, @message | sort @timestamp asc", limit=self.limit, ) query_id = response["queryId"] except client.exceptions.ResourceNotFoundException as e: raise DateOutOfBoundsError(f"Resource not found: {e}") while True: time.sleep(1) results = client.get_query_results(queryId=query_id) if results["status"] in [ "Complete", "Failed", "Cancelled", "Timeout", "Unknown", ]: return results.get("results", []) except DateOutOfBoundsError: return [] def _initiate_query(self, client, date_range, max_logs): """ Initiates the CloudWatch logs query. :param date_range: A tuple representing the start and end datetime for the query. :type date_range: tuple :param max_logs: The maximum number of logs to retrieve. :type max_logs: int :return: The query ID as a string. :rtype: str """ try: start_time = round( self.date_utilities.convert_iso8601_to_unix_timestamp(date_range[0]) ) end_time = round( self.date_utilities.convert_iso8601_to_unix_timestamp(date_range[1]) ) response = client.start_query( logGroupName=self.log_groups, startTime=start_time, endTime=end_time, queryString="fields @timestamp, @message | sort @timestamp asc", limit=max_logs, ) return response["queryId"] except client.exceptions.ResourceNotFoundException as e: raise DateOutOfBoundsError(f"Resource not found: {e}") def _wait_for_query_results(self, client, query_id): """ Waits for the query to complete and retrieves the results. :param query_id: The ID of the initiated query. :type query_id: str :return: A list containing the results of the query. :rtype: list """ while True: time.sleep(1) results = client.get_query_results(queryId=query_id) if results["status"] in [ "Complete", "Failed", "Cancelled", "Timeout", "Unknown", ]: return results.get("results", [])
  • Para API obtener más información, consulte los siguientes temas en la sección AWS SDKde referencia sobre Python (Boto3). API