

# Send data to a Firehose stream
<a name="basic-write"></a>

This section describes how you can use different data sources to send data to your Firehose stream. If you are new to Amazon Data Firehose, take some time to become familiar with the concepts and terminology presented in [What is Amazon Data Firehose?](what-is-this-service.md).

**Note**  
Some AWS services can only send messages and events to a Firehose stream that is in the same Region. If your Firehose stream doesn't appear as an option when you're configuring a target for Amazon CloudWatch Logs, CloudWatch Events, or AWS IoT, verify that your Firehose stream is in the same Region as your other services. For information on service endpoints for each Region, see [Amazon Data Firehose endpoints](https://docs.aws.amazon.com/general/latest/gr/fh.html#fh_region).

You can send data to your Firehose stream from the following data sources.

**Topics**
+ [Configure Kinesis agent to send data](writing-with-agents.md)
+ [Send data with AWS SDK](writing-with-sdk.md)
+ [Send CloudWatch Logs to Firehose](writing-with-cloudwatch-logs.md)
+ [Send CloudWatch Events to Firehose](writing-with-cloudwatch-events.md)
+ [Configure AWS IoT to send data to Firehose](writing-with-iot.md)

# Configure Kinesis agent to send data
<a name="writing-with-agents"></a>

Amazon Kinesis agent is a standalone Java software application that serves as a reference implementation to show how you can collect and send data to Firehose. The agent continuously monitors a set of files and sends new data to your Firehose stream. The agent shows how you can handle file rotation, checkpointing, and retry upon failures. It shows how you can deliver your data in a reliable, timely, and simple manner. It also shows how you can emit CloudWatch metrics to better monitor and troubleshoot the streaming process. To learn more, [awslabs/amazon-kinesis-agent](https://github.com/awslabs/amazon-kinesis-agent).

By default, records are parsed from each file based on the newline (`'\n'`) character. However, the agent can also be configured to parse multi-line records (see [Specify agent configuration settings](agent-config-settings.md)). 

You can install the agent on Linux-based server environments such as web servers, log servers, and database servers. After installing the agent, configure it by specifying the files to monitor and the Firehose stream for the data. After the agent is configured, it durably collects data from the files and reliably sends it to the Firehose stream.

## Prerequisites
<a name="prereqs"></a>

Before you start using Kinesis Agent, make sure you meet the following prerequisites.
+ Your operating system must be Amazon Linux, or Red Hat Enterprise Linux version 7 or later. 
+ Agent version 2.0.0 or later runs using JRE version 1.8 or later. Agent version 1.1.x runs using JRE 1.7 or later. 
+ If you are using Amazon EC2 to run your agent, launch your EC2 instance.
+ The IAM role or AWS credentials that you specify must have permission to perform the Amazon Data Firehose [PutRecordBatch](https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html) operation for the agent to send data to your Firehose stream. If you enable CloudWatch monitoring for the agent, permission to perform the CloudWatch [PutMetricData](https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html) operation is also needed. For more information, see [Controlling access with Amazon Data Firehose](controlling-access.md), [Monitor Kinesis Agent health](agent-health.md), and [Authentication and Access Control for Amazon CloudWatch](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/auth-and-access-control-cw.html).

# Manage AWS credentials
<a name="agent-credentials"></a>

Manage your AWS credentials using one of the following methods:
+ Create a custom credentials provider. For details, see [Create custom credential providers](custom-cred-provider.md).
+ Specify an IAM role when you launch your EC2 instance.
+ Specify AWS credentials when you configure the agent (see the entries for `awsAccessKeyId` and `awsSecretAccessKey` in the configuration table under [Specify agent configuration settings](agent-config-settings.md)).
+ Edit `/etc/sysconfig/aws-kinesis-agent` to specify your AWS Region and AWS access keys.
+ If your EC2 instance is in a different AWS account, create an IAM role to provide access to the Amazon Data Firehose service. Specify that role when you configure the agent (see [assumeRoleARN](agent-config-settings.md#assumeRoleARN) and [assumeRoleExternalId](agent-config-settings.md#assumeRoleExternalId)). Use one of the previous methods to specify the AWS credentials of a user in the other account who has permission to assume this role.

# Create custom credential providers
<a name="custom-cred-provider"></a>

You can create a custom credentials provider and give its class name and jar path to the Kinesis agent in the following configuration settings: `userDefinedCredentialsProvider.classname` and `userDefinedCredentialsProvider.location`. For the descriptions of these two configuration settings, see [Specify agent configuration settings](agent-config-settings.md).

To create a custom credentials provider, define a class that implements the `AWSCredentialsProvider` interface, like the one in the following example.

```
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;

public class YourClassName implements AWSCredentialsProvider {
    public YourClassName() {
    }

    public AWSCredentials getCredentials() {
        return new BasicAWSCredentials("key1", "key2");
    }

    public void refresh() {
    }
}
```

Your class must have a constructor that takes no arguments.

AWS invokes the refresh method periodically to get updated credentials. If you want your credentials provider to provide different credentials throughout its lifetime, include code to refresh the credentials in this method. Alternatively, you can leave this method empty if you want a credentials provider that vends static (non-changing) credentials. 

# Download and install the Agent
<a name="download-install"></a>

First, connect to your instance. For more information, see [Connect to Your Instance](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-connect-to-instance-linux.html) in the *Amazon EC2 User Guide*. If you have trouble connecting, see [Troubleshooting Connecting to Your Instance](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/TroubleshootingInstancesConnecting.html) in the *Amazon EC2 User Guide*.

Next, install the agent using one of the following methods.
+ **To set up the agent from the Amazon Linux repositories **

  This method works only for Amazon Linux instances. Use the following command:

  ```
  sudo yum install –y aws-kinesis-agent
  ```

  Agent v 2.0.0 or later is installed on computers with operating system Amazon Linux 2 (AL2). This agent version requires Java 1.8 or later. If required Java version is not yet present, the agent installation process installs it. For more information regarding Amazon Linux 2 see [https://aws.amazon.com/amazon-linux-2/](https://aws.amazon.com/amazon-linux-2/).
+ **To set up the agent from the Amazon S3 repository**

  This method works for Red Hat Enterprise Linux, as well as Amazon Linux 2 instances because it installs the agent from the publicly available repository. Use the following command to download and install the latest version of the agent version 2.x.x: 

  ```
  sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn2.noarch.rpm
  ```

  To install a specific version of the agent, specify the version number in the command. For example, the following command installs agent v 2.0.1. 

  ```
  sudo yum install –y https://streaming-data-agent.s3.amazonaws.com/aws-kinesis-agent-2.0.1-1.amzn1.noarch.rpm
  ```

  If you have Java 1.7 and you don’t want to upgrade it, you can download agent version 1.x.x, which is compatible with Java 1.7. For example, to download agent v1.1.6, you can use the following command: 

  ```
  sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-1.1.6-1.amzn1.noarch.rpm
  ```

  You can download the latest agent with the following command

  ```
  sudo yum install -y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn2.noarch.rpm
  ```
+ **To set up the agent from the GitHub repo**

  1. First, make sure that you have required Java version installed, depending on agent version.

  1.  Download the agent from the [awslabs/amazon-kinesis-agent](https://github.com/awslabs/amazon-kinesis-agent) GitHub repo.

  1. Install the agent by navigating to the download directory and running the following command:

     ```
     sudo ./setup --install
     ```
+ 

**To set up the agent in a Docker container**  
Kinesis Agent can be run in a container as well via the [amazonlinux](https://docs.aws.amazon.com/AmazonECR/latest/userguide/amazon_linux_container_image.html) container base. Use the following Dockerfile and then run `docker build`.

  ```
  FROM amazonlinux
  
  RUN yum install -y aws-kinesis-agent which findutils
  COPY agent.json /etc/aws-kinesis/agent.json
  
  CMD ["start-aws-kinesis-agent"]
  ```

# Configure and start the Agent
<a name="config-start"></a>

**To configure and start the agent**

1. Open and edit the configuration file (as superuser if using default file access permissions): `/etc/aws-kinesis/agent.json` 

   In this configuration file, specify the files ( `"filePattern"` ) from which the agent collects data, and the name of the Firehose stream ( `"deliveryStream"` ) to which the agent sends data. The file name is a pattern, and the agent recognizes file rotations. You can rotate files or create new files no more than once per second. The agent uses the file creation time stamp to determine which files to track and tail into your Firehose stream. Creating new files or rotating files more frequently than once per second does not allow the agent to differentiate properly between them.

   ```
   { 
      "flows": [
           { 
               "filePattern": "/tmp/app.log*", 
               "deliveryStream": "yourdeliverystream"
           } 
      ] 
   }
   ```

   The default AWS Region is `us-east-1`. If you are using a different Region, add the `firehose.endpoint` setting to the configuration file, specifying the endpoint for your Region. For more information, see [Specify agent configuration settings](agent-config-settings.md).

1. Start the agent manually:

   ```
   sudo service aws-kinesis-agent start
   ```

1. (Optional) Configure the agent to start on system startup:

   ```
   sudo chkconfig aws-kinesis-agent on
   ```

The agent is now running as a system service in the background. It continuously monitors the specified files and sends data to the specified Firehose stream. Agent activity is logged in `/var/log/aws-kinesis-agent/aws-kinesis-agent.log`. 

# Specify agent configuration settings
<a name="agent-config-settings"></a>

The agent supports two mandatory configuration settings, `filePattern` and `deliveryStream`, plus optional configuration settings for additional features. You can specify both mandatory and optional configuration settings in `/etc/aws-kinesis/agent.json`.

Whenever you change the configuration file, you must stop and start the agent, using the following commands:

```
sudo service aws-kinesis-agent stop
sudo service aws-kinesis-agent start
```

Alternatively, you could use the following command:

```
sudo service aws-kinesis-agent restart
```

The following are the general configuration settings.


| Configuration Setting | Description | 
| --- | --- | 
| <a name="assumeRoleARN"></a>assumeRoleARN |  The Amazon Resource Name (ARN) of the role to be assumed by the user. For more information, see [Delegate Access Across AWS Accounts Using IAM Roles](https://docs.aws.amazon.com/IAM/latest/UserGuide/tutorial_cross-account-with-roles.html) in the *IAM User Guide*.  | 
| <a name="assumeRoleExternalId"></a>assumeRoleExternalId |  An optional identifier that determines who can assume the role. For more information, see [How to Use an External ID](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html) in the *IAM User Guide*.  | 
| <a name="awsAccessKeyId"></a>awsAccessKeyId |  AWS access key ID that overrides the default credentials. This setting takes precedence over all other credential providers.  | 
| <a name="awsSecretAccessKey"></a>awsSecretAccessKey |  AWS secret key that overrides the default credentials. This setting takes precedence over all other credential providers.  | 
| cloudwatch.emitMetrics |  Enables the agent to emit metrics to CloudWatch if set (true). Default: true  | 
| cloudwatch.endpoint |  The regional endpoint for CloudWatch. Default: `monitoring.us-east-1.amazonaws.com`  | 
| firehose.endpoint |  The regional endpoint for Amazon Data Firehose. Default: `firehose.us-east-1.amazonaws.com`  | 
| sts.endpoint |  The regional endpoint for the AWS Security Token Service. Default: `https://sts.amazonaws.com`  | 
| userDefinedCredentialsProvider.classname | If you define a custom credentials provider, provide its fully-qualified class name using this setting. Don't include .class at the end of the class name.  | 
| userDefinedCredentialsProvider.location | If you define a custom credentials provider, use this setting to specify the absolute path of the jar that contains the custom credentials provider. The agent also looks for the jar file in the following location: /usr/share/aws-kinesis-agent/lib/. | 

The following are the flow configuration settings.


| Configuration Setting | Description | 
| --- | --- | 
| aggregatedRecordSizeBytes |  To make the agent aggregate records and then put them to the Firehose stream in one operation, specify this setting. Set it to the size that you want the aggregate record to have before the agent puts it to the Firehose stream.  Default: 0 (no aggregation)  | 
| dataProcessingOptions |  The list of processing options applied to each parsed record before it is sent to the Firehose stream. The processing options are performed in the specified order. For more information, see [Pre-process data with Agents](pre-processing.md).  | 
| deliveryStream |  [Required] The name of the Firehose stream.  | 
| filePattern |  [Required] A glob for the files that need to be monitored by the agent. Any file that matches this pattern is picked up by the agent automatically and monitored. For all files matching this pattern, grant read permission to `aws-kinesis-agent-user`. For the directory containing the files, grant read and execute permissions to `aws-kinesis-agent-user`.  The agent picks up any file that matches this pattern. To ensure that the agent doesn't pick up unintended records, choose this pattern carefully.   | 
| initialPosition |  The initial position from which the file started to be parsed. Valid values are `START_OF_FILE` and `END_OF_FILE`. Default: `END_OF_FILE`  | 
| maxBufferAgeMillis |  The maximum time, in milliseconds, for which the agent buffers data before sending it to the Firehose stream. Value range: 1,000–900,000 (1 second to 15 minutes) Default: 60,000 (1 minute)  | 
| maxBufferSizeBytes |  The maximum size, in bytes, for which the agent buffers data before sending it to the Firehose stream. Value range: 1–4,194,304 (4 MB) Default: 4,194,304 (4 MB)  | 
| maxBufferSizeRecords |  The maximum number of records for which the agent buffers data before sending it to the Firehose stream. Value range: 1–500 Default: 500  | 
| minTimeBetweenFilePollsMillis |  The time interval, in milliseconds, at which the agent polls and parses the monitored files for new data. Value range: 1 or more Default: 100  | 
| multiLineStartPattern |  The pattern for identifying the start of a record. A record is made of a line that matches the pattern and any following lines that don't match the pattern. The valid values are regular expressions. By default, each new line in the log files is parsed as one record.  | 
| skipHeaderLines |  The number of lines for the agent to skip parsing at the beginning of monitored files. Value range: 0 or more Default: 0 (zero)  | 
| truncatedRecordTerminator |  The string that the agent uses to truncate a parsed record when the record size exceeds the Amazon Data Firehose record size limit. (1,000 KB) Default: `'\n'` (newline)  | 

# Configure multiple file directories and streams
<a name="sim-writes"></a>

By specifying multiple flow configuration settings, you can configure the agent to monitor multiple file directories and send data to multiple streams. In the following configuration example, the agent monitors two file directories and sends data to a Kinesis data stream and a Firehose stream respectively. You can specify different endpoints for Kinesis Data Streams and Amazon Data Firehose so that your data stream and Firehose stream don’t need to be in the same Region.

```
{
    "cloudwatch.emitMetrics": true,
    "kinesis.endpoint": "https://your/kinesis/endpoint", 
    "firehose.endpoint": "https://your/firehose/endpoint", 
    "flows": [
        {
            "filePattern": "/tmp/app1.log*", 
            "kinesisStream": "yourkinesisstream"
        }, 
        {
            "filePattern": "/tmp/app2.log*",
            "deliveryStream": "yourfirehosedeliverystream" 
        }
    ] 
}
```

For more detailed information about using the agent with Amazon Kinesis Data Streams, see [Writing to Amazon Kinesis Data Streams with Kinesis Agent](https://docs.aws.amazon.com/kinesis/latest/dev/writing-with-agents.html).

# Pre-process data with Agents
<a name="pre-processing"></a>

The agent can pre-process the records parsed from monitored files before sending them to your Firehose stream. You can enable this feature by adding the `dataProcessingOptions` configuration setting to your file flow. One or more processing options can be added, and they are performed in the specified order.

The agent supports the following processing options. Because the agent is open source, you can further develop and extend its processing options. You can download the agent from [Kinesis Agent](https://github.com/awslabs/amazon-kinesis-agent).Processing Options

`SINGLELINE`  
Converts a multi-line record to a single-line record by removing newline characters, leading spaces, and trailing spaces.  

```
{
    "optionName": "SINGLELINE"
}
```

`CSVTOJSON`  
Converts a record from delimiter-separated format to JSON format.  

```
{
    "optionName": "CSVTOJSON",
    "customFieldNames": [ "field1", "field2", ... ],
    "delimiter": "yourdelimiter"
}
```  
`customFieldNames`  
[Required] The field names used as keys in each JSON key value pair. For example, if you specify `["f1", "f2"]`, the record "v1, v2" is converted to `{"f1":"v1","f2":"v2"}`.  
`delimiter`  
The string used as the delimiter in the record. The default is a comma (,).

`LOGTOJSON`  
Converts a record from a log format to JSON format. The supported log formats are **Apache Common Log**, **Apache Combined Log**, **Apache Error Log**, and **RFC3164 Syslog**.  

```
{
    "optionName": "LOGTOJSON",
    "logFormat": "logformat",
    "matchPattern": "yourregexpattern",
    "customFieldNames": [ "field1", "field2", … ]
}
```  
`logFormat`  
[Required] The log entry format. The following are possible values:  
+ `COMMONAPACHELOG` — The Apache Common Log format. Each log entry has the following pattern by default: "`%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes}`".
+ `COMBINEDAPACHELOG` — The Apache Combined Log format. Each log entry has the following pattern by default: "`%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes} %{referrer} %{agent}`".
+ `APACHEERRORLOG` — The Apache Error Log format. Each log entry has the following pattern by default: "`[%{timestamp}] [%{module}:%{severity}] [pid %{processid}:tid %{threadid}] [client: %{client}] %{message}`".
+ `SYSLOG` — The RFC3164 Syslog format. Each log entry has the following pattern by default: "`%{timestamp} %{hostname} %{program}[%{processid}]: %{message}`".  
`matchPattern`  
Overrides the default pattern for the specified log format. Use this setting to extract values from log entries if they use a custom format. If you specify `matchPattern`, you must also specify `customFieldNames`.  
`customFieldNames`  
The custom field names used as keys in each JSON key value pair. You can use this setting to define field names for values extracted from `matchPattern`, or override the default field names of predefined log formats.

**Example : LOGTOJSON Configuration**  <a name="example-logtojson"></a>
Here is one example of a `LOGTOJSON` configuration for an Apache Common Log entry converted to JSON format:  

```
{
    "optionName": "LOGTOJSON",
    "logFormat": "COMMONAPACHELOG"
}
```
Before conversion:  

```
64.242.88.10 - - [07/Mar/2004:16:10:02 -0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291
```
After conversion:  

```
{"host":"64.242.88.10","ident":null,"authuser":null,"datetime":"07/Mar/2004:16:10:02 -0800","request":"GET /mailman/listinfo/hsdivision HTTP/1.1","response":"200","bytes":"6291"}
```

**Example : LOGTOJSON Configuration With Custom Fields**  <a name="example-logtojson-custom-fields"></a>
Here is another example `LOGTOJSON` configuration:  

```
{
    "optionName": "LOGTOJSON",
    "logFormat": "COMMONAPACHELOG",
    "customFieldNames": ["f1", "f2", "f3", "f4", "f5", "f6", "f7"]
}
```
With this configuration setting, the same Apache Common Log entry from the previous example is converted to JSON format as follows:  

```
{"f1":"64.242.88.10","f2":null,"f3":null,"f4":"07/Mar/2004:16:10:02 -0800","f5":"GET /mailman/listinfo/hsdivision HTTP/1.1","f6":"200","f7":"6291"}
```

**Example : Convert Apache Common Log Entry**  <a name="example-apache-common-log-entry"></a>
The following flow configuration converts an Apache Common Log entry to a single-line record in JSON format:  

```
{ 
    "flows": [
        {
            "filePattern": "/tmp/app.log*", 
            "deliveryStream": "my-delivery-stream",
            "dataProcessingOptions": [
                {
                    "optionName": "LOGTOJSON",
                    "logFormat": "COMMONAPACHELOG"
                }
            ]
        }
    ] 
}
```

**Example : Convert Multi-Line Records**  <a name="example-convert-multi-line"></a>
The following flow configuration parses multi-line records whose first line starts with "`[SEQUENCE=`". Each record is first converted to a single-line record. Then, values are extracted from the record based on a tab delimiter. Extracted values are mapped to specified `customFieldNames` values to form a single-line record in JSON format.  

```
{ 
    "flows": [
        {
            "filePattern": "/tmp/app.log*", 
            "deliveryStream": "my-delivery-stream",
            "multiLineStartPattern": "\\[SEQUENCE=",
            "dataProcessingOptions": [
                {
                    "optionName": "SINGLELINE"
                },
                {
                    "optionName": "CSVTOJSON",
                    "customFieldNames": [ "field1", "field2", "field3" ],
                    "delimiter": "\\t"
                }
            ]
        }
    ] 
}
```

**Example : LOGTOJSON Configuration with Match Pattern**  <a name="example-logtojson-match-pattern"></a>
Here is one example of a `LOGTOJSON` configuration for an Apache Common Log entry converted to JSON format, with the last field (bytes) omitted:  

```
{
    "optionName": "LOGTOJSON",
    "logFormat": "COMMONAPACHELOG",
    "matchPattern": "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3})",
    "customFieldNames": ["host", "ident", "authuser", "datetime", "request", "response"]
}
```
Before conversion:  

```
123.45.67.89 - - [27/Oct/2000:09:27:09 -0400] "GET /java/javaResources.html HTTP/1.0" 200
```
After conversion:  

```
{"host":"123.45.67.89","ident":null,"authuser":null,"datetime":"27/Oct/2000:09:27:09 -0400","request":"GET /java/javaResources.html HTTP/1.0","response":"200"}
```

# Use common Agent CLI commands
<a name="cli-commands"></a>

The following table provides a set of common use cases and corresponding commands for working with the AWS Kinesis agent. 


| Use case | Command | 
| --- | --- | 
|  Automatically start the agent on system start up  |  <pre>sudo chkconfig aws-kinesis-agent on</pre>  | 
|  Check the status of the agent  |  <pre>sudo service aws-kinesis-agent status</pre>  | 
|  Stop the agent  |  <pre>sudo service aws-kinesis-agent stop</pre>  | 
|  Read the agent's log file from this location  |  <pre>/var/log/aws-kinesis-agent/aws-kinesis-agent.log</pre>  | 
|  Uninstall the agent  |  <pre>sudo yum remove aws-kinesis-agent</pre>  | 

# Troubleshoot issues when sending from Kinesis Agent
<a name="agent-faq"></a>

This table provides troubleshooting information and solutions for common issues faced when using the Amazon Kinesis Agent. 


| Issue | Solution | 
| --- | --- | 
| Why does Kinesis Agent not work on Windows? |  [Kinesis Agent for Windows](https://docs.aws.amazon.com/kinesis-agent-windows/latest/userguide/what-is-kinesis-agent-windows.html) is different software than Kinesis Agent for Linux platforms.  | 
| Why is Kinesis Agent slowing down and/or RecordSendErrors increasing? |  This is usually due to throttling from Kinesis. Check the `WriteProvisionedThroughputExceeded` metric for Kinesis Data Streams or the `ThrottledRecords` metric for Firehose streams. Any increase from 0 in these metrics indicates that the stream limits need to be increased. For more information, see [Kinesis Data Stream limits](https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html) and [Firehose streams](https://docs.aws.amazon.com/firehose/latest/dev/limits.html). Once you rule out throttling, see if the Kinesis Agent is configured to tail a large amount of small files. There is a delay when Kinesis Agent tails a new file, so Kinesis Agent should be tailing a small amount of larger files. Try consolidating your log files into larger files.  | 
| How to resolve the java.lang.OutOfMemoryError exceptions? | This happends when Kinesis Agent does not have enough memory to handle its current workload. Try increasing JAVA\$1START\$1HEAP and JAVA\$1MAX\$1HEAP in /usr/bin/start-aws-kinesis-agent and restarting the agent. | 
| How to resolve the IllegalStateException : connection pool shut down exceptions? | Kinesis Agent does not have enough connections to handle its current workload. Try increasing maxConnections and maxSendingThreads in your general agent configuration settings at /etc/aws-kinesis/agent.json. The default value for these fields is 12 times the runtime processors available. See [AgentConfiguration.java](https://github.com/awslabs/amazon-kinesis-agent/blob/master/src/com/amazon/kinesis/streaming/agent/config/AgentConfiguration.java) for more about advanced agent configurations settings.  | 
| How can I debug another issue with Kinesis Agent? | DEBUG level logs can be enabled in /etc/aws-kinesis/log4j.xml . | 
| How should I configure Kinesis Agent? | The smaller the maxBufferSizeBytes, the more frequently Kinesis Agent will send data. This can be good as it decreases delivery time of records, but it also increases the requests per second to Kinesis.  | 
| Why is Kinesis Agent sending duplicate records? | This occurs due to a misconfiguration in file tailing. Make sure that each fileFlow’s filePattern is only matching one file. This can also occur if the logrotate mode being used is in copytruncate mode. Try changing the mode to the default or create mode to avoid duplication. For more information on handling duplicate records, see [Handling Duplicate Records](https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-duplicates.html). | 

# Send data with AWS SDK
<a name="writing-with-sdk"></a>

You can use the [Amazon Data Firehose API](https://docs.aws.amazon.com/firehose/latest/APIReference/) to send data to a Firehose stream using the [AWS SDK for Java](https://aws.amazon.com/sdk-for-java/), [.NET](https://aws.amazon.com/sdk-for-net/), [Node.js](https://aws.amazon.com/sdk-for-javascript/), [Python](https://aws.amazon.com/sdk-for-python/), or [Ruby](https://aws.amazon.com/sdk-for-ruby/). If you are new to Amazon Data Firehose, take some time to become familiar with the concepts and terminology presented in [What is Amazon Data Firehose?](what-is-this-service.md). For more information, see [Start Developing with Amazon Web Services](http://aws.amazon.com/developers/getting-started/).

These examples do not represent production-ready code, in that they do not check for all possible exceptions, or account for all possible security or performance considerations. 

The Amazon Data Firehose API offers two operations for sending data to your Firehose stream: [PutRecord](https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecord.html) and [PutRecordBatch](https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html). `PutRecord()` sends one data record within one call and `PutRecordBatch()` can send multiple data records within one call. 

## Single write operations using PutRecord
<a name="putrecord"></a>

Putting data requires only the Firehose stream name and a byte buffer (<=1000 KB). Because Amazon Data Firehose batches multiple records before loading the file into Amazon S3, you may want to add a record separator. To put data one record at a time into a Firehose stream, use the following code:

```
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setDeliveryStreamName(deliveryStreamName);

String data = line + "\n";

Record record = new Record().withData(ByteBuffer.wrap(data.getBytes()));
putRecordRequest.setRecord(record);

// Put record into the DeliveryStream
firehoseClient.putRecord(putRecordRequest);
```

For more code context, see the sample code included in the AWS SDK. For information about request and response syntax, see the relevant topic in [Firehose API Operations](https://docs.aws.amazon.com/firehose/latest/APIReference/API_Operations.html).

## Batch write operations using PutRecordBatch
<a name="putrecordbatch"></a>

Putting data requires only the Firehose stream name and a list of records. Because Amazon Data Firehose batches multiple records before loading the file into Amazon S3, you may want to add a record separator. To put data records in batches into a Firehose stream, use the following code:

```
PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest();
putRecordBatchRequest.setDeliveryStreamName(deliveryStreamName);
putRecordBatchRequest.setRecords(recordList);

// Put Record Batch records. Max No.Of Records we can put in a
// single put record batch request is 500
firehoseClient.putRecordBatch(putRecordBatchRequest);

recordList.clear();
```

For more code context, see the sample code included in the AWS SDK. For information about request and response syntax, see the relevant topic in [Firehose API Operations](https://docs.aws.amazon.com/firehose/latest/APIReference/API_Operations.html).

# Send CloudWatch Logs to Firehose
<a name="writing-with-cloudwatch-logs"></a>

CloudWatch Logs events can be sent to Firehose using CloudWatch subscription filters. For more information, see [Subscription filters with Amazon Data Firehose](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#FirehoseExample).

CloudWatch Logs events are sent to Firehose in compressed gzip format. If you want to deliver decompressed log events to Firehose destinations, you can use the decompression feature in Firehose to automatically decompress CloudWatch Logs. 

**Important**  
Currently, Firehose does not support the delivery of CloudWatch Logs to Amazon OpenSearch Service destination because Amazon CloudWatch combines multiple log events into one Firehose record and Amazon OpenSearch Service cannot accept multiple log events in one record. As an alternative, you can consider [Using subscription filter for Amazon OpenSearch Service in CloudWatch Logs](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_OpenSearch_Stream.html).

# Decompress CloudWatch Logs
<a name="writing-with-cloudwatch-logs-decompression"></a>

If you are using Firehose to deliver CloudWatch Logs and want to deliver decompressed data to your Firehose stream destination, use Firehose [Data Format Conversion](https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html) (Parquet, ORC) or [Dynamic partitioning](https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html). You must enable decompression for your Firehose stream.

You can enable decompression using the AWS Management Console, AWS Command Line Interface or AWS SDKs.

**Note**  
If you enable the decompression feature on a stream, use that stream exclusively for CloudWatch Logs subscriptions filters, and not for Vended Logs. If you enable the decompression feature on a stream that is used to ingest both CloudWatch Logs and Vended Logs, the Vended Logs ingestion to Firehose fails. This decompression feature is only for CloudWatch Logs.

# Extract message after decompression of CloudWatch Logs
<a name="Message_extraction"></a>

When you enable decompression, you have the option to also enable message extraction. When using message extraction, Firehose filters out all metadata, such as owner, loggroup, logstream, and others from the decompressed CloudWatch Logs records and delivers only the content inside the message fields. If you are delivering data to a Splunk destination, you must turn on message extraction for Splunk to parse the data. Following are sample outputs after decompression with and without message extraction.

Fig 1: Sample output after decompression without message extraction:

```
{
 "owner": "111111111111",
 "logGroup": "CloudTrail/logs",
 "logStream": "111111111111_CloudTrail/logs_us-east-1",
 "subscriptionFilters": [
 "Destination"
 ],
 "messageType": "DATA_MESSAGE",
 "logEvents": [
 {
 "id": "31953106606966983378809025079804211143289615424298221568",
 "timestamp": 1432826855000,
 "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root1\"}"
 },
 {
 "id": "31953106606966983378809025079804211143289615424298221569",
 "timestamp": 1432826855000,
 "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root2\"}"
 },
 {
 "id": "31953106606966983378809025079804211143289615424298221570",
 "timestamp": 1432826855000,
 "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root3\"}"
 }
 ]
}
```

Fig 2: Sample output after decompression with message extraction:

```
{"eventVersion":"1.03","userIdentity":{"type":"Root1"}
{"eventVersion":"1.03","userIdentity":{"type":"Root2"}
{"eventVersion":"1.03","userIdentity":{"type":"Root3"}
```

# Enable decompression on a new Firehose stream from console
<a name="writing-with-cloudwatch-logs-decompression-enabling-console"></a>

**To enable decompression on a new Firehose stream using the AWS Management Console**

1. Sign in to the AWS Management Console and open the Kinesis console at [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Choose **Amazon Data Firehose** in the navigation pane.

1. Choose **Create Firehose stream**.

1. Under **Choose source and destination**  
****Source****  
The source of your Firehose stream. Choose one of the following sources:  
   + **Direct PUT** – Choose this option to create a Firehose stream that producer applications write to directly. For a list of AWS services and agents and open source services that are integrated with Direct PUT in Firehose, see [this](create-name.md) section.
   + **Kinesis stream:** Choose this option to configure a Firehose stream that uses a Kinesis data stream as a data source. You can then use Firehose to read data easily from an existing Kinesis data stream and load it into destinations. For more information, see [Writing to Firehose Using Kinesis Data Streams](https://docs.aws.amazon.com/firehose/latest/dev/writing-with-kinesis-streams.html)  
****Destination****  
The destination of your Firehose stream. Choose one of the following:  
   + Amazon S3
   + Splunk

1. Under **Firehose stream name**, enter a name for your stream.

1. (Optional) Under **Transform records**:
   + In the **Decompress source records from Amazon CloudWatch Logs** section, choose **Turn on decompression**.
   + If you want to use message extraction after decompression, choose **Turn on message extraction**.

# Enable decompression on an existing Firehose stream
<a name="enabling-decompression-existing-stream-console"></a>

This section provides instructions for enabling decompression on existing Firehose streams. It covers two scenarios – streams with Lambda processing disabled and streams with Lambda processing already enabled. The following sections outline step-by-step procedures for each case, including the creation or modification of Lambda functions, updating Firehose settings, and monitoring CloudWatch metrics to ensure successful implementation of the built-in Firehose decompression feature.

## Enabling decompression when Lambda processing is disabled
<a name="enabling-decomp-exist-stream-lam-disable"></a>

To enable decompression on an existing Firehose stream with Lambda processing disabled, you must first enable Lambda processing. This condition is only valid for existing streams. Following steps show how to enable decompression on existing streams that do not have Lambda processing enabled.

1. Create a Lambda function. You can either create a dummy record pass through or can use this [blueprint](https://github.com/aws-samples/aws-kinesis-firehose-resources/tree/main/blueprints/kinesis-firehose-cloudwatch-logs-processor) to create a new Lambda function. 

1. Update your current Firehose stream to enable Lambda processing and use the Lambda function that you created for processing.

1. Once you update the stream with new Lambda function, go back to Firehose console and enable decompression.

1. Disable the Lambda processing that you enabled in step 1. You can now delete the function that you created in step 1.

## Enabling decompression when Lambda processing is enabled
<a name="enabling-decomp-exist-stream-lam-enable"></a>

If you already have a Firehose stream with a Lambda function, to perform decompression you can replace it with the Firehose decompression feature. Before you proceed, review your Lambda function code to confirm that it only performs decompression or message extraction. The output of your Lambda function should look similar to the examples shown in [Fig 1 or Fig 2](Message_extraction.md). If the output looks similar, you can replace the Lambda function using the following steps.

1. Replace your current Lambda function with this [blueprint](https://github.com/aws-samples/aws-kinesis-firehose-resources/tree/main/blueprints/kinesis-firehose-cloudwatch-logs-processor). The new blueprint Lambda function automatically detects whether the incoming data is compressed or decompressed. It only performs decompression if its input data is compressed.

1. Turn on decompression using the built-in Firehose option for decompression.

1. Enable CloudWatch metrics for your Firehose stream if it's not already enabled. Monitor the metric `CloudWatchProcessorLambda_IncomingCompressedData` and wait until this metric changes to zero. This confirms that all input data sent to your Lambda function is decompressed and the Lambda function is no longer required.

1. Remove the Lambda data transformation because you no longer need it to decompress your stream.

# Disable decompression on Firehose stream
<a name="writing-with-cloudwatch-logs-decompression-disabling-console"></a>

****

To disable decompression on a data stream using the AWS Management Console

1. Sign in to the AWS Management Console and open the Kinesis console at [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Choose **Amazon Data Firehose** in the navigation pane.

1. Choose the Firehose stream you wish to edit.

1. On **Firehose stream details** page, choose the **Configuration** tab.

1. In the **Transform and convert records** section, choose **Edit**.

1. Under **Decompress source records from Amazon CloudWatch Logs**, clear **Turn on decompression** and then choose **Save changes**.

# Troubleshoot decompression in Firehose
<a name="decomp-faq"></a>

The following table shows how Firehose handles errors during data decompression and processing, including delivering records to an error S3 bucket, logging errors, and emitting metrics. It also explains the error message returned for unauthorized data put operations.


| Issue | Solution | 
| --- | --- | 
| What happens to the source data in case of an error during decompression? |  If Amazon Data Firehose is not able to decompress the record, the record is delivered as is (in compressed format) to error S3 bucket you specified during Firehose stream creation time. Along with the record, the delivered object also includes error code and error message and these objects will be delivered to an S3 bucket prefix called `decompression-failed`. Firehose will continue to process other records after a failed decompression of a record.  | 
| What happens to the source data in case of an error in the processing pipeline after successful decompression? |  If Amazon Data Firehose errors out in the processing steps after decompression like Dynamic Partitioning and Data Format Conversion, the record is delivered in compressed format to the error S3 bucket you specified during Firehose stream creation time. Along with the record, the delivered object also includes error code and error message.  | 
| How are you informed in case of an error or an exception? |  In case of an error or an exception during decompression, if you configure CloudWatch Logs, Firehose will log error messages into CloudWatch Logs. Additionally, Firehose sends metrics to CloudWatch metrics that you can monitor. You can also optionally create alarms based on metrics emitted by Firehose.  | 
| What happens when put operations don't come from CloudWatch Logs? | When customer puts do not come from CloudWatch Logs, then the following error message is returned: <pre>Put to Firehose failed for AccountId: <accountID>, FirehoseName:  <firehosename> because the request is not originating from allowed source types.</pre> | 
| What metrics does Firehose emit for the decompression feature? | Firehose emits metrics for decompression of every record. You should select the period (1 min), statistic (sum), date range to get the number of DecompressedRecords failed or succeeded or DecompressedBytes failed or succeeded. For more information, see [CloudWatch Logs Decompression Metrics](monitoring-with-cloudwatch-metrics.md#decompression-metrics-cw). | 

# Send CloudWatch Events to Firehose
<a name="writing-with-cloudwatch-events"></a>

You can configure Amazon CloudWatch to send events to a Firehose stream by adding a target to a CloudWatch Events rule.

**To create a target for a CloudWatch Events rule that sends events to an existing Firehose stream**

1. Sign in to the AWS Management Console and open the CloudWatch console at [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).

1. Choose **Create rule**.

1. On the **Step 1: Create rule** page, for **Targets**, choose **Add target**, and then choose **Firehose stream**.

1. Choose an existing **Firehose stream**.

For more information about creating CloudWatch Events rules, see [Getting Started with Amazon CloudWatch Events](https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/CWE_GettingStarted.html).

# Configure AWS IoT to send data to Firehose
<a name="writing-with-iot"></a>

You can configure AWS IoT to send information to a Firehose stream by adding an action.

**To create an action that sends events to an existing Firehose stream**

1. When creating a rule in the AWS IoT console, on the **Create a rule** page, under **Set one or more actions**, choose **Add action**.

1. Choose **Send messages to an Amazon Kinesis Firehose stream**.

1. Choose **Configure action**.

1. For **Stream name**, choose an existing Firehose stream. 

1. For **Separator**, choose a separator character to be inserted between records.

1. For **IAM role name**, choose an existing IAM role or choose **Create a new role**.

1. Choose **Add action**.

For more information about creating AWS IoT rules, see [AWS IoT Rule Tutorials](https://docs.aws.amazon.com/iot/latest/developerguide/iot-rules-tutorial.html).