Using the CloudTrail Processing Library
The CloudTrail Processing Library is a Java library that provides an easy way to process AWS CloudTrail logs. You provide configuration details about your CloudTrail SQS queue and write code to process events. The CloudTrail Processing Library does the rest. It polls your Amazon SQS queue, reads and parses queue messages, downloads CloudTrail log files, parses events in the log files, and passes the events to your code as Java objects.
The CloudTrail Processing Library is highly scalable and fault-tolerant. It handles parallel processing of log files so that you can process as many logs as needed. It handles network failures related to network timeouts and inaccessible resources.
The following topic shows you how to use the CloudTrail Processing Library to process CloudTrail logs in your Java projects.
The library is provided as an Apache-licensed open-source project, available on GitHub:
https://github.com/aws/aws-cloudtrail-processing-library
Minimum requirements
To use the CloudTrail Processing Library, you must have the following:
Processing CloudTrail logs
To process CloudTrail logs in your Java application:
Adding the CloudTrail Processing Library to your project
To use the CloudTrail Processing Library, add it to your Java project's classpath.
Contents
Adding the library to an Apache Ant project
To add the CloudTrail Processing Library to an Apache Ant project
-
Download or clone the CloudTrail Processing Library source code from GitHub:
-
Build the .jar file from source as described in the README
: mvn clean install -Dgpg.skip=true
-
Copy the resulting .jar file into your project and add it to your project's
build.xml
file. For example:<classpath> <pathelement path="${classpath}"/> <pathelement location="lib/aws-cloudtrail-processing-library-1.6.1.jar"/> </classpath>
Adding the library to an Apache Maven project
The CloudTrail Processing Library is available for Apache Mavenpom.xml
file.
To add the CloudTrail Processing Library to a Maven project
-
Open your Maven project's
pom.xml
file and add the following dependency:<dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-cloudtrail-processing-library</artifactId> <version>1.6.1</version> </dependency>
Adding the library to an Eclipse project
To add the CloudTrail Processing Library to an Eclipse project
-
Download or clone the CloudTrail Processing Library source code from GitHub:
-
Build the .jar file from source as described in the README
: mvn clean install -Dgpg.skip=true
-
Copy the built aws-cloudtrail-processing-library-1.6.1.jar to a directory in your project (typically
lib
). -
Right-click your project's name in the Eclipse Project Explorer, choose Build Path, and then choose Configure
-
In the Java Build Path window, choose the Libraries tab.
-
Choose Add JARs... and navigate to the path where you copied aws-cloudtrail-processing-library-1.6.1.jar.
-
Choose OK to complete adding the
.jar
to your project.
Adding the library to an IntelliJ project
To add the CloudTrail Processing Library to an IntelliJ project
-
Download or clone the CloudTrail Processing Library source code from GitHub:
-
Build the .jar file from source as described in the README
: mvn clean install -Dgpg.skip=true
-
From File, choose Project Structure.
-
Choose Modules and then choose Dependencies.
-
Choose + JARS or Directories and then go to the path where you built the
aws-cloudtrail-processing-library-1.6.1.jar
. -
Choose Apply and then choose OK to complete adding the
.jar
to your project.
Configuring the CloudTrail Processing Library
You can configure the CloudTrail Processing Library by creating a classpath properties file that is loaded
at runtime, or by creating a ClientConfiguration
object and setting options
manually.
Providing a properties file
You can write a classpath properties file that provides configuration options to your application. The following example file shows the options you can set:
# AWS access key. (Required) accessKey = your_access_key # AWS secret key. (Required) secretKey = your_secret_key # The SQS URL used to pull CloudTrail notification from. (Required) sqsUrl = your_sqs_queue_url # The SQS end point specific to a region. sqsRegion = us-east-1 # A period of time during which Amazon SQS prevents other consuming components # from receiving and processing that message. visibilityTimeout = 60 # The S3 region to use. s3Region = us-east-1 # Number of threads used to download S3 files in parallel. Callbacks can be # invoked from any thread. threadCount = 1 # The time allowed, in seconds, for threads to shut down after # AWSCloudTrailEventProcessingExecutor.stop() is called. If they are still # running beyond this time, they will be forcibly terminated. threadTerminationDelaySeconds = 60 # The maximum number of AWSCloudTrailClientEvents sent to a single invocation # of processEvents(). maxEventsPerEmit = 10 # Whether to include raw event information in CloudTrailDeliveryInfo. enableRawEventInfo = false # Whether to delete SQS message when the CloudTrail Processing Library is unable to process the notification. deleteMessageUponFailure = false
The following parameters are required:
-
sqsUrl
– Provides the URL from which to pull your CloudTrail notifications. If you don't specify this value, theAWSCloudTrailProcessingExecutor
throws anIllegalStateException
. -
accessKey
– A unique identifier for your account, such as AKIAIOSFODNN7EXAMPLE. -
secretKey
– A unique identifier for your account, such as wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY.
The accessKey
and secretKey
parameters provide your AWS
credentials to the library so the library can access AWS on your behalf.
Defaults for the other parameters are set by the library. For more information, see the AWS CloudTrail Processing Library Reference.
Creating a ClientConfiguration
Instead of setting options in the classpath properties, you can provide options to the
AWSCloudTrailProcessingExecutor
by initializing and setting options on a
ClientConfiguration
object, as shown in the following example:
ClientConfiguration basicConfig = new ClientConfiguration( "http://sqs.us-east-1.amazonaws.com/123456789012/queue2", new DefaultAWSCredentialsProviderChain()); basicConfig.setEnableRawEventInfo(true); basicConfig.setThreadCount(4); basicConfig.setnEventsPerEmit(20);
Implementing the events processor
To process CloudTrail logs, you must implement an EventsProcessor
that receives
the CloudTrail log data. The following is an example implementation:
public class SampleEventsProcessor implements EventsProcessor { public void process(List<CloudTrailEvent> events) { int i = 0; for (CloudTrailEvent event : events) { System.out.println(String.format("Process event %d : %s", i++, event.getEventData())); } } }
When implementing an EventsProcessor
, you implement the
process()
callback that the AWSCloudTrailProcessingExecutor
uses
to send you CloudTrail events. Events are provided in a list of CloudTrailClientEvent
objects.
The CloudTrailClientEvent
object provides a CloudTrailEvent
and CloudTrailEventMetadata
that you can use to read the CloudTrail event and
delivery information.
This simple example prints the event information for each event passed to
SampleEventsProcessor
. In your own implementation, you can process logs as
you see fit. The AWSCloudTrailProcessingExecutor
continues to send events to
your EventsProcessor
as long as it has events to send and is still
running.
Instantiating and running the processing executor
After you write an EventsProcessor
and set configuration values for the
CloudTrail Processing Library (either in a properties file or by using the ClientConfiguration
class), you can use these elements to initialize and use an
AWSCloudTrailProcessingExecutor
.
To use AWSCloudTrailProcessingExecutor
to process CloudTrail events
-
Instantiate an
AWSCloudTrailProcessingExecutor.Builder
object.Builder
's constructor takes anEventsProcessor
object and a classpath properties file name. -
Call the
Builder
'sbuild()
factory method to configure and obtain anAWSCloudTrailProcessingExecutor
object. -
Use the
AWSCloudTrailProcessingExecutor
'sstart()
andstop()
methods to begin and end CloudTrail event processing.
public class SampleApp { public static void main(String[] args) throws InterruptedException { AWSCloudTrailProcessingExecutor executor = new AWSCloudTrailProcessingExecutor.Builder(new SampleEventsProcessor(), "/myproject/cloudtrailprocessing.properties").build(); executor.start(); Thread.sleep(24 * 60 * 60 * 1000); // let it run for a while (optional) executor.stop(); // optional } }
Advanced topics
Filtering the events to process
By default, all logs in your Amazon SQS queue's S3 bucket and all events that they contain
are sent to your EventsProcessor
. The CloudTrail Processing Library provides optional interfaces
that you can implement to filter the sources used to obtain CloudTrail logs and to filter the
events that you are interested in processing.
SourceFilter
-
You can implement the
SourceFilter
interface to choose whether you want to process logs from a provided source.SourceFilter
declares a single callback method,filterSource()
, that receives aCloudTrailSource
object. To keep events from a source from being processed, returnfalse
fromfilterSource()
.The CloudTrail Processing Library calls the
filterSource()
method after the library polls for logs on the Amazon SQS queue. This occurs before the library starts event filtering or processing for the logs.The following is an example implementation:
public class SampleSourceFilter implements SourceFilter{ private static final int MAX_RECEIVED_COUNT = 3; private static List<String> accountIDs ; static { accountIDs = new ArrayList<>(); accountIDs.add("123456789012"); accountIDs.add("234567890123"); } @Override public boolean filterSource(CloudTrailSource source) throws CallbackException { source = (SQSBasedSource) source; Map<String, String> sourceAttributes = source.getSourceAttributes(); String accountId = sourceAttributes.get( SourceAttributeKeys.ACCOUNT_ID.getAttributeKey()); String receivedCount = sourceAttributes.get( SourceAttributeKeys.APPROXIMATE_RECEIVE_COUNT.getAttributeKey()); int approximateReceivedCount = Integer.parseInt(receivedCount); return approximateReceivedCount <= MAX_RECEIVED_COUNT && accountIDs.contains(accountId); } }
If you don't provide your own
SourceFilter
, thenDefaultSourceFilter
is used, which allows all sources to be processed (it always returnstrue
). EventFilter
-
You can implement the
EventFilter
interface to choose whether a CloudTrail event is sent to yourEventsProcessor
.EventFilter
declares a single callback method,filterEvent()
, that receives aCloudTrailEvent
object. To keep the event from being processed, returnfalse
fromfilterEvent()
.The CloudTrail Processing Library calls the
filterEvent()
method after the library polls for logs on the Amazon SQS queue and after source filtering. This occurs before the library starts event processing for the logs.See the following example implementation:
public class SampleEventFilter implements EventFilter{ private static final String EC2_EVENTS = "ec2.amazonaws.com"; @Override public boolean filterEvent(CloudTrailClientEvent clientEvent) throws CallbackException { CloudTrailEvent event = clientEvent.getEvent(); String eventSource = event.getEventSource(); String eventName = event.getEventName(); return eventSource.equals(EC2_EVENTS) && eventName.startsWith("Delete"); } }
If you don't provide your own
EventFilter
, thenDefaultEventFilter
is used, which allows all events to be processed (it always returnstrue
).
Processing data events
When CloudTrail processes data events, it preserves numbers in their original format, whether
that is an integer (int
) or a float
(a number that contains a
decimal). In events that have integers in the fields of a data event, CloudTrail historically
processed these numbers as floats. Currently, CloudTrail processes numbers in these fields by
keeping their original format.
As a best practice, to avoid breaking your automations, be flexible in any code or automation that you are using
to process or filter CloudTrail data events, and allow both int
and float
formatted numbers. For best results,
use version 1.4.0 or higher of the CloudTrail Processing Library.
The following example snippet shows a float
formatted number, 2.0
, for the desiredCount
parameter in the
ResponseParameters
block of a data event.
"eventName": "CreateService", "awsRegion": "us-east-1", "sourceIPAddress": "000.00.00.00", "userAgent": "console.amazonaws.com", "requestParameters": { "clientToken": "EXAMPLE", "cluster": "default", "desiredCount": 2.0 ...
The following example snippet shows an int
formatted number, 2
, for the desiredCount
parameter in the ResponseParameters
block of a data event.
"eventName": "CreateService", "awsRegion": "us-east-1", "sourceIPAddress": "000.00.00.00", "userAgent": "console.amazonaws.com", "requestParameters": { "clientToken": "EXAMPLE", "cluster": "default", "desiredCount": 2 ...
Reporting progress
Implement the ProgressReporter
interface to customize the reporting of
CloudTrail Processing Library progress. ProgressReporter
declares two methods:
reportStart()
and reportEnd()
, which are called at the beginning
and end of the following operations:
-
Polling messages from Amazon SQS
-
Parsing messages from Amazon SQS
-
Processing an Amazon SQS source for CloudTrail logs
-
Deleting messages from Amazon SQS
-
Downloading a CloudTrail log file
-
Processing a CloudTrail log file
Both methods receive a ProgressStatus
object that contains information
about the operation that was performed. The progressState
member holds a member
of the ProgressState
enumeration that identifies the current operation. This
member can contain additional information in the progressInfo
member.
Additionally, any object that you return from reportStart()
is passed to
reportEnd()
, so you can provide contextual information such as the time when
the event began processing.
The following is an example implementation that provides information about how long an operation took to complete:
public class SampleProgressReporter implements ProgressReporter { private static final Log logger = LogFactory.getLog(DefaultProgressReporter.class); @Override public Object reportStart(ProgressStatus status) { return new Date(); } @Override public void reportEnd(ProgressStatus status, Object startDate) { System.out.println(status.getProgressState().toString() + " is " + status.getProgressInfo().isSuccess() + " , and latency is " + Math.abs(((Date) startDate).getTime()-new Date().getTime()) + " milliseconds."); } }
If you don't implement your own ProgressReporter
, then
DefaultExceptionHandler
, which prints the name of the state being run, is
used instead.
Handling errors
The ExceptionHandler
interface allows you to provide special handling when
an exception occurs during log processing. ExceptionHandler
declares a single
callback method, handleException()
, which receives a
ProcessingLibraryException
object with context about the exception that
occurred.
You can use the passed-in ProcessingLibraryException
's
getStatus()
method to find out what operation was executed when the exception
occurred and get additional information about the status of the operation.
ProcessingLibraryException
is derived from Java's standard
Exception
class, so you can also retrieve information about the exception by
invoking any of the exception methods.
See the following example implementation:
public class SampleExceptionHandler implements ExceptionHandler{ private static final Log logger = LogFactory.getLog(DefaultProgressReporter.class); @Override public void handleException(ProcessingLibraryException exception) { ProgressStatus status = exception.getStatus(); ProgressState state = status.getProgressState(); ProgressInfo info = status.getProgressInfo(); System.err.println(String.format( "Exception. Progress State: %s. Progress Information: %s.", state, info)); } }
If you don't provide your own ExceptionHandler
, then
DefaultExceptionHandler
, which prints a standard error message, is used
instead.
Note
If the deleteMessageUponFailure
parameter is true
, the
CloudTrail Processing Library does not distinguish general exceptions from processing errors and may delete
queue messages.
-
For example, you use the
SourceFilter
to filter messages by timestamp. -
However, you don't have the required permissions to access the S3 bucket that receives the CloudTrail log files. Because you don't have the required permissions, an
AmazonServiceException
is thrown. The CloudTrail Processing Library wraps this in aCallBackException
. -
The
DefaultExceptionHandler
logs this as an error, but does not identify the root cause, which is that you don't have the required permissions. The CloudTrail Processing Library considers this a processing error and deletes the message, even if the message includes a valid CloudTrail log file.
If you want to filter messages with SourceFilter
, verify that your
ExceptionHandler
can distinguish service exceptions from processing errors.
Additional resources
For more information about the CloudTrail Processing Library, see the following:
-
CloudTrail Processing Library
GitHub project, which includes sample code that demonstrates how to implement a CloudTrail Processing Library application.