AWS 文档 AWS SDK示例 GitHub 存储库中还有更多SDK示例
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 CloudWatch 日志运行大型查询
以下代码示例展示了如何使用 CloudWatch 日志查询超过 10,000 条记录。
- JavaScript
-
- SDK对于 JavaScript (v3)
-
注意
还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 这是入口点。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { CloudWatchLogsClient } from "@aws-sdk/client-cloudwatch-logs"; import { CloudWatchQuery } from "./cloud-watch-query.js"; console.log("Starting a recursive query..."); if (!process.env.QUERY_START_DATE || !process.env.QUERY_END_DATE) { throw new Error( "QUERY_START_DATE and QUERY_END_DATE environment variables are required.", ); } const cloudWatchQuery = new CloudWatchQuery(new CloudWatchLogsClient({}), { logGroupNames: ["/workflows/cloudwatch-logs/large-query"], dateRange: [ new Date(Number.parseInt(process.env.QUERY_START_DATE)), new Date(Number.parseInt(process.env.QUERY_END_DATE)), ], }); await cloudWatchQuery.run(); console.log( `Queries finished in ${cloudWatchQuery.secondsElapsed} seconds.\nTotal logs found: ${cloudWatchQuery.results.length}`, );
该类可在必要时将查询拆分为多个步骤。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { StartQueryCommand, GetQueryResultsCommand, } from "@aws-sdk/client-cloudwatch-logs"; import { splitDateRange } from "@aws-doc-sdk-examples/lib/utils/util-date.js"; import { retry } from "@aws-doc-sdk-examples/lib/utils/util-timers.js"; class DateOutOfBoundsError extends Error {} export class CloudWatchQuery { /** * Run a query for all CloudWatch Logs within a certain date range. * CloudWatch logs return a max of 10,000 results. This class * performs a binary search across all of the logs in the provided * date range if a query returns the maximum number of results. * * @param {import('@aws-sdk/client-cloudwatch-logs').CloudWatchLogsClient} client * @param {{ logGroupNames: string[], dateRange: [Date, Date], queryConfig: { limit: number } }} config */ constructor(client, { logGroupNames, dateRange, queryConfig }) { this.client = client; /** * All log groups are queried. */ this.logGroupNames = logGroupNames; /** * The inclusive date range that is queried. */ this.dateRange = dateRange; /** * CloudWatch Logs never returns more than 10,000 logs. */ this.limit = queryConfig?.limit ?? 10000; /** * @type {import("@aws-sdk/client-cloudwatch-logs").ResultField[][]} */ this.results = []; } /** * Run the query. */ async run() { this.secondsElapsed = 0; const start = new Date(); this.results = await this._largeQuery(this.dateRange); const end = new Date(); this.secondsElapsed = (end - start) / 1000; return this.results; } /** * Recursively query for logs. * @param {[Date, Date]} dateRange * @returns {Promise<import("@aws-sdk/client-cloudwatch-logs").ResultField[][]>} */ async _largeQuery(dateRange) { const logs = await this._query(dateRange, this.limit); console.log( `Query date range: ${dateRange .map((d) => d.toISOString()) .join(" to ")}. Found ${logs.length} logs.`, ); if (logs.length < this.limit) { return logs; } const lastLogDate = this._getLastLogDate(logs); const offsetLastLogDate = new Date(lastLogDate); offsetLastLogDate.setMilliseconds(lastLogDate.getMilliseconds() + 1); const subDateRange = [offsetLastLogDate, dateRange[1]]; const [r1, r2] = splitDateRange(subDateRange); const results = await Promise.all([ this._largeQuery(r1), this._largeQuery(r2), ]); return [logs, ...results].flat(); } /** * Find the most recent log in a list of logs. * @param {import("@aws-sdk/client-cloudwatch-logs").ResultField[][]} logs */ _getLastLogDate(logs) { const timestamps = logs .map( (log) => log.find((fieldMeta) => fieldMeta.field === "@timestamp")?.value, ) .filter((t) => !!t) .map((t) => `${t}Z`) .sort(); if (!timestamps.length) { throw new Error("No timestamp found in logs."); } return new Date(timestamps[timestamps.length - 1]); } /** * Simple wrapper for the GetQueryResultsCommand. * @param {string} queryId */ _getQueryResults(queryId) { return this.client.send(new GetQueryResultsCommand({ queryId })); } /** * Starts a query and waits for it to complete. * @param {[Date, Date]} dateRange * @param {number} maxLogs */ async _query(dateRange, maxLogs) { try { const { queryId } = await this._startQuery(dateRange, maxLogs); const { results } = await this._waitUntilQueryDone(queryId); return results ?? []; } catch (err) { /** * This error is thrown when StartQuery returns an error indicating * that the query's start or end date occur before the log group was * created. */ if (err instanceof DateOutOfBoundsError) { return []; } throw err; } } /** * Wrapper for the StartQueryCommand. Uses a static query string * for consistency. * @param {[Date, Date]} dateRange * @param {number} maxLogs * @returns {Promise<{ queryId: string }>} */ async _startQuery([startDate, endDate], maxLogs = 10000) { try { return await this.client.send( new StartQueryCommand({ logGroupNames: this.logGroupNames, queryString: "fields @timestamp, @message | sort @timestamp asc", startTime: startDate.valueOf(), endTime: endDate.valueOf(), limit: maxLogs, }), ); } catch (err) { /** @type {string} */ const message = err.message; if (message.startsWith("Query's end date and time")) { // This error indicates that the query's start or end date occur // before the log group was created. throw new DateOutOfBoundsError(message); } throw err; } } /** * Call GetQueryResultsCommand until the query is done. * @param {string} queryId */ _waitUntilQueryDone(queryId) { const getResults = async () => { const results = await this._getQueryResults(queryId); const queryDone = [ "Complete", "Failed", "Cancelled", "Timeout", "Unknown", ].includes(results.status); return { queryDone, results }; }; return retry( { intervalInMs: 1000, maxRetries: 60, quiet: true }, async () => { const { queryDone, results } = await getResults(); if (!queryDone) { throw new Error("Query not done."); } return results; }, ); } }
-
有关API详细信息,请参阅 “参AWS SDK for JavaScript API考” 中的以下主题。
-
- Python
-
- SDK适用于 Python (Boto3)
-
注意
还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 此文件调用一个示例模块,用于管理超过 10,000 个结果的 CloudWatch 查询。
import logging import os import sys import boto3 from botocore.config import Config from cloudwatch_query import CloudWatchQuery from date_utilities import DateUtilities # Configure logging at the module level. logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s", ) class CloudWatchLogsQueryRunner: def __init__(self): """ Initializes the CloudWatchLogsQueryRunner class by setting up date utilities and creating a CloudWatch Logs client with retry configuration. """ self.date_utilities = DateUtilities() self.cloudwatch_logs_client = self.create_cloudwatch_logs_client() def create_cloudwatch_logs_client(self): """ Creates and returns a CloudWatch Logs client with a specified retry configuration. :return: A CloudWatch Logs client instance. :rtype: boto3.client """ try: return boto3.client("logs", config=Config(retries={"max_attempts": 10})) except Exception as e: logging.error(f"Failed to create CloudWatch Logs client: {e}") sys.exit(1) def fetch_environment_variables(self): """ Fetches and validates required environment variables for query start and end dates. :return: Tuple of query start date and end date as integers. :rtype: tuple :raises SystemExit: If required environment variables are missing or invalid. """ try: query_start_date = int(os.environ["QUERY_START_DATE"]) query_end_date = int(os.environ["QUERY_END_DATE"]) except KeyError: logging.error( "Both QUERY_START_DATE and QUERY_END_DATE environment variables are required." ) sys.exit(1) except ValueError as e: logging.error(f"Error parsing date environment variables: {e}") sys.exit(1) return query_start_date, query_end_date def convert_dates_to_iso8601(self, start_date, end_date): """ Converts UNIX timestamp dates to ISO 8601 format using DateUtilities. :param start_date: The start date in UNIX timestamp. :type start_date: int :param end_date: The end date in UNIX timestamp. :type end_date: int :return: Start and end dates in ISO 8601 format. :rtype: tuple """ start_date_iso8601 = self.date_utilities.convert_unix_timestamp_to_iso8601( start_date ) end_date_iso8601 = self.date_utilities.convert_unix_timestamp_to_iso8601( end_date ) return start_date_iso8601, end_date_iso8601 def execute_query( self, start_date_iso8601, end_date_iso8601, log_group="/workflows/cloudwatch-logs/large-query", ): """ Creates a CloudWatchQuery instance and executes the query with provided date range. :param start_date_iso8601: The start date in ISO 8601 format. :type start_date_iso8601: str :param end_date_iso8601: The end date in ISO 8601 format. :type end_date_iso8601: str :param log_group: Log group to search: "/workflows/cloudwatch-logs/large-query" :type log_group: str """ cloudwatch_query = CloudWatchQuery( [start_date_iso8601, end_date_iso8601], ) cloudwatch_query.query_logs((start_date_iso8601, end_date_iso8601)) logging.info("Query executed successfully.") logging.info( f"Queries completed in {cloudwatch_query.query_duration} seconds. Total logs found: {len(cloudwatch_query.query_results)}" ) def main(): """ Main function to start a recursive CloudWatch logs query. Fetches required environment variables, converts dates, and executes the query. """ logging.info("Starting a recursive CloudWatch logs query...") runner = CloudWatchLogsQueryRunner() query_start_date, query_end_date = runner.fetch_environment_variables() start_date_iso8601 = DateUtilities.convert_unix_timestamp_to_iso8601( query_start_date ) end_date_iso8601 = DateUtilities.convert_unix_timestamp_to_iso8601(query_end_date) runner.execute_query(start_date_iso8601, end_date_iso8601) if __name__ == "__main__": main()
此模块处理超过 10,000 个结果的 CloudWatch 查询。
import logging import time from datetime import datetime import threading import boto3 from date_utilities import DateUtilities class DateOutOfBoundsError(Exception): """Exception raised when the date range for a query is out of bounds.""" pass class CloudWatchQuery: """ A class to query AWS CloudWatch logs within a specified date range. :ivar date_range: Start and end datetime for the query. :vartype date_range: tuple :ivar limit: Maximum number of log entries to return. :vartype limit: int """ def __init__(self, date_range): self.lock = threading.Lock() self.log_groups = "/workflows/cloudwatch-logs/large-query" self.query_results = [] self.date_range = date_range self.query_duration = None self.datetime_format = "%Y-%m-%d %H:%M:%S.%f" self.date_utilities = DateUtilities() self.limit = 10000 def query_logs(self, date_range): """ Executes a CloudWatch logs query for a specified date range and calculates the execution time of the query. :return: A batch of logs retrieved from the CloudWatch logs query. :rtype: list """ start_time = datetime.now() start_date, end_date = self.date_utilities.normalize_date_range_format( date_range, from_format="unix_timestamp", to_format="datetime" ) logging.info( f"Original query:" f"\n START: {start_date}" f"\n END: {end_date}" ) self.recursive_query((start_date, end_date)) end_time = datetime.now() self.query_duration = (end_time - start_time).total_seconds() def recursive_query(self, date_range): """ Processes logs within a given date range, fetching batches of logs recursively if necessary. :param date_range: The date range to fetch logs for, specified as a tuple (start_timestamp, end_timestamp). :type date_range: tuple :return: None if the recursive fetching is continued or stops when the final batch of logs is processed. Although it doesn't explicitly return the query results, this method accumulates all fetched logs in the `self.query_results` attribute. :rtype: None """ batch_of_logs = self.perform_query(date_range) # Add the batch to the accumulated logs with self.lock: self.query_results.extend(batch_of_logs) if len(batch_of_logs) == self.limit: logging.info(f"Fetched {self.limit}, checking for more...") most_recent_log = self.find_most_recent_log(batch_of_logs) most_recent_log_timestamp = next( item["value"] for item in most_recent_log if item["field"] == "@timestamp" ) new_range = (most_recent_log_timestamp, date_range[1]) midpoint = self.date_utilities.find_middle_time(new_range) first_half_thread = threading.Thread( target=self.recursive_query, args=((most_recent_log_timestamp, midpoint),), ) second_half_thread = threading.Thread( target=self.recursive_query, args=((midpoint, date_range[1]),) ) first_half_thread.start() second_half_thread.start() first_half_thread.join() second_half_thread.join() def find_most_recent_log(self, logs): """ Search a list of log items and return most recent log entry. :param logs: A list of logs to analyze. :return: log :type :return List containing log item details """ most_recent_log = None most_recent_date = "1970-01-01 00:00:00.000" for log in logs: for item in log: if item["field"] == "@timestamp": logging.debug(f"Compared: {item['value']} to {most_recent_date}") if ( self.date_utilities.compare_dates( item["value"], most_recent_date ) == item["value"] ): logging.debug(f"New most recent: {item['value']}") most_recent_date = item["value"] most_recent_log = log logging.info(f"Most recent log date of batch: {most_recent_date}") return most_recent_log def perform_query(self, date_range): """ Performs the actual CloudWatch log query. :param date_range: A tuple representing the start and end datetime for the query. :type date_range: tuple :return: A list containing the query results. :rtype: list """ client = boto3.client("logs") try: try: start_time = round( self.date_utilities.convert_iso8601_to_unix_timestamp(date_range[0]) ) end_time = round( self.date_utilities.convert_iso8601_to_unix_timestamp(date_range[1]) ) response = client.start_query( logGroupName=self.log_groups, startTime=start_time, endTime=end_time, queryString="fields @timestamp, @message | sort @timestamp asc", limit=self.limit, ) query_id = response["queryId"] except client.exceptions.ResourceNotFoundException as e: raise DateOutOfBoundsError(f"Resource not found: {e}") while True: time.sleep(1) results = client.get_query_results(queryId=query_id) if results["status"] in [ "Complete", "Failed", "Cancelled", "Timeout", "Unknown", ]: return results.get("results", []) except DateOutOfBoundsError: return [] def _initiate_query(self, client, date_range, max_logs): """ Initiates the CloudWatch logs query. :param date_range: A tuple representing the start and end datetime for the query. :type date_range: tuple :param max_logs: The maximum number of logs to retrieve. :type max_logs: int :return: The query ID as a string. :rtype: str """ try: start_time = round( self.date_utilities.convert_iso8601_to_unix_timestamp(date_range[0]) ) end_time = round( self.date_utilities.convert_iso8601_to_unix_timestamp(date_range[1]) ) response = client.start_query( logGroupName=self.log_groups, startTime=start_time, endTime=end_time, queryString="fields @timestamp, @message | sort @timestamp asc", limit=max_logs, ) return response["queryId"] except client.exceptions.ResourceNotFoundException as e: raise DateOutOfBoundsError(f"Resource not found: {e}") def _wait_for_query_results(self, client, query_id): """ Waits for the query to complete and retrieves the results. :param query_id: The ID of the initiated query. :type query_id: str :return: A list containing the results of the query. :rtype: list """ while True: time.sleep(1) results = client.get_query_results(queryId=query_id) if results["status"] in [ "Complete", "Failed", "Cancelled", "Timeout", "Unknown", ]: return results.get("results", [])
-
有关API详细信息,请参阅 Python (Boto3) API 参考中的AWS SDK以下主题。
-
场景
使用计划的事件调用 Lambda 函数