

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 向 Firehose 流发送数据
<a name="basic-write"></a>

本节介绍如何使用不同的数据源将数据发送到您的 Firehose 流。如果您不了解 Amazon Data Firehose，请花点时间熟悉 [什么是 Amazon Data Firehose？](what-is-this-service.md) 中介绍的概念和术语。

**注意**  
某些 AWS 服务只能向位于同一区域的 Firehose 直播发送消息和事件。如果在为亚马逊 CloudWatch 日志、 CloudWatch 事件或配置目标时，您的 Firehose 直播未作为选项出现 AWS IoT，请验证您的 Firehose 直播是否与其他服务位于同一区域。有关每个区域的服务端点的信息，请参阅 [Amazon Data Firehose 端点](https://docs.aws.amazon.com/general/latest/gr/fh.html#fh_region)。

您可以将数据从以下数据来源发送到 Firehose 流。

**Topics**
+ [配置 Kinesis 代理以发送数据](writing-with-agents.md)
+ [使用 AWS SDK 发送数据](writing-with-sdk.md)
+ [向 Firehose 发送 CloudWatch 日志](writing-with-cloudwatch-logs.md)
+ [向 Fire CloudWatch hose 发送事件](writing-with-cloudwatch-events.md)
+ [配置 AWS IoT 为向 Firehose 发送数据](writing-with-iot.md)

# 配置 Kinesis 代理以发送数据
<a name="writing-with-agents"></a>

Amazon Kinesis 代理是独立的 Java 软件应用程序，可用作参考实施，以显示如何收集数据并将其发送到 Firehose。此代理持续监控一组文件，并将新数据发送到您的 Firehose 流。代理显示您如何处理文件轮换、检查点操作并在失败时重试。它向您显示如何以可靠、及时且简单的方法传输您的数据。它还显示了如何发布 CloudWatch指标以更好地监控流媒体过程并对其进行故障排除。要了解更多信息，请访问 [awslabs/ amazon-kinesis-agent](https://github.com/awslabs/amazon-kinesis-agent)。

默认情况下，会基于换行符 (`'\n'`) 分析每个文件中的记录。但是，也可以将代理配置为分析多行记录（请参阅 [指定代理配置设置](agent-config-settings.md)）。

您可以在基于 Linux 的服务器环境（如 Web 服务器、日志服务器和数据库服务器）上安装此代理。在安装代理后，通过指定要监控的文件和数据的 Firehose 流来配置代理。在配置好代理之后，代理将持续从这些文件中收集数据并以可靠的方式将数据发送到 Firehose 流。

## 先决条件
<a name="prereqs"></a>

在开始使用 Kinesis 代理之前，请确保您满足以下先决条件。
+ 您的操作系统必须是 Amazon Linux 或 Red Hat Enterprise Linux 7 或更高版本。
+ 代理 2.0.0 或更高版本使用 JRE 1.8 或更高版本运行。代理 1.1.x 使用 JRE 1.7 或更高版本运行。
+ 如果您使用 Amazon EC2 运行代理，请启动 EC2 实例。
+ 您指定的 IAM 角色或 AWS 证书必须具有执行 Amazon Data Firehose [PutRecordBatch](https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html)操作的权限，代理才能将数据发送到您的 Firehose 流。如果您为代理启用 CloudWatch 监控，则还需要执行该 CloudWatch [PutMetricData](https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html)操作的权限。有关更多信息，请参阅[使用 Amazon Data Firehose 控制访问权限](controlling-access.md)[监控 Kinesis 代理运行状况](agent-health.md)、和 [Amazon 的身份验证和访问控制 CloudWatch](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/auth-and-access-control-cw.html)。

# 管理 AWS 凭证
<a name="agent-credentials"></a>

使用以下方法之一管理您的 AWS 证书：
+ 创建自定义凭证提供程序。有关更多信息，请参阅 [创建自定义凭证提供程序](custom-cred-provider.md)。
+ 当您启动您的 EC2 实例时指定该 IAM 角色。
+ 在配置代理时指定 AWS 凭据（参见下方配置表`awsSecretAccessKey`中的`awsAccessKeyId`和条目[指定代理配置设置](agent-config-settings.md)）。
+ 编辑`/etc/sysconfig/aws-kinesis-agent`以指定您的 AWS 地区和 AWS 访问密钥。
+ 如果您的 EC2 实例位于不同的 AWS 账户中，请创建一个 IAM 角色以提供对 Amazon Data Firehose 服务的访问权限。[在配置代理时指定该角色（参见 Assume [Learn 和](agent-config-settings.md#assumeRoleARN) IdassumeRoleExternal）。](agent-config-settings.md#assumeRoleExternalId)使用前面的方法之一来指定另一个账户中有权担任此角色的用户的 AWS 证书。

# 创建自定义凭证提供程序
<a name="custom-cred-provider"></a>

您可以创建自定义凭证提供程序，并在以下配置设置中为 Kinesis 代理提供其类名和 jar 路径：`userDefinedCredentialsProvider.classname` 和 `userDefinedCredentialsProvider.location`。有关这两个配置设置的说明，请参阅[指定代理配置设置](agent-config-settings.md)。

要创建自定义凭证提供程序，请定义一个实现 `AWS CredentialsProvider` 接口的类，如下例所示。

```
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;

public class YourClassName implements AWSCredentialsProvider {
    public YourClassName() {
    }

    public AWSCredentials getCredentials() {
        return new BasicAWSCredentials("key1", "key2");
    }

    public void refresh() {
    }
}
```

您的类必须有一个不带参数的构造函数。

AWS 定期调用刷新方法以获取更新的凭证。如果希望凭证提供程序在其整个生命周期内提供不同的凭证，请在此方法中包含用于刷新凭证的代码。或者，如果您需要提供静态（不更改）凭证的凭证提供程序，则可以将此方法保留为空。

# 下载并安装代理
<a name="download-install"></a>

首先，连接到您的实例。有关详细信息，请参阅《Amazon EC2 用户指南》**中的[连接到您的实例](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-connect-to-instance-linux.html)。如果您在连接时遇到问题，请参阅《Amazon EC2 用户指南》**中的[排查 Amazon EC2 Linux 实例的连接问题](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/TroubleshootingInstancesConnecting.html)。

接下来，请使用以下方法之一安装代理。
+ **要从 Amazon Linux 存储库设置代理**

  此方法仅适用于 Amazon Linux 实例。使用以下命令：

  ```
  sudo yum install –y aws-kinesis-agent
  ```

  Agent v 2.0.0 或更高版本安装在装有 Amazon Linux 2 () AL2 操作系统的计算机上。此代理版本需要安装 Java 1.8 或更高版本。如果尚未安装所需的 Java 版本，代理安装程序将会安装。有关亚马逊 Linux 2 的更多信息，请参阅[https://aws.amazon.com/amazon-linux-2/](https://aws.amazon.com/amazon-linux-2/)。
+ **从 Amazon S3 存储库设置代理**

  此方法从公开可用的存储库安装代理，因此适用于 Red Hat Enterprise Linux 以及 Amazon Linux 2 实例。使用以下命令下载并安装最新版本的代理 2.x.x：

  ```
  sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn2.noarch.rpm
  ```

  要安装特定版本的代理，请在命令中指定版本号。例如，以下命令将安装代理 2.0.1。

  ```
  sudo yum install –y https://streaming-data-agent.s3.amazonaws.com/aws-kinesis-agent-2.0.1-1.amzn1.noarch.rpm
  ```

  如果您使用的是 Java 1.7，但不想升级，则可以下载与 Java 1.7 兼容的代理 1.x.x。例如，要下载代理 1.1.6，可使用以下命令：

  ```
  sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-1.1.6-1.amzn1.noarch.rpm
  ```

  您可以使用以下命令下载最新代理

  ```
  sudo yum install -y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn2.noarch.rpm
  ```
+ **从 GitHub repo 中设置代理**

  1. 首先，确保您已安装所需的 Java 版本，具体取决于代理版本。

  1.  从 [awslabs amazon-kinesis-agent](https://github.com/awslabs/amazon-kinesis-agent) GitHub /存储库下载代理。

  1. 导航到下载目录并运行以下命令来安装代理：

     ```
     sudo ./setup --install
     ```
+ 

**在 Docker 容器中设置代理**  
Kinesis 代理可以在容器中运行，也可以通过 [amazonlinux](https://docs.aws.amazon.com/AmazonECR/latest/userguide/amazon_linux_container_image.html) 容器库运行。使用以下 Dockerfile，然后运行 `docker build`。

  ```
  FROM amazonlinux
  
  RUN yum install -y aws-kinesis-agent which findutils
  COPY agent.json /etc/aws-kinesis/agent.json
  
  CMD ["start-aws-kinesis-agent"]
  ```

# 配置并启动代理
<a name="config-start"></a>

**配置并启动代理**

1. 打开并编辑配置文件（如果使用默认文件访问权限，则以超级用户的身份来执行）：`/etc/aws-kinesis/agent.json`

   在此配置文件中，指定代理从中收集数据的文件（`"filePattern"`）以及代理将数据发送到的 Firehose 流的名称（`"deliveryStream"`）。文件名是一种模式，并且代理能够识别文件轮换。每秒内您轮换使用文件或创建新文件的次数不能超过一次。代理使用文件创建时间戳来确定要跟踪并送入 Firehose 流中的文件。如果每秒创建新文件或轮换使用文件的次数超过一次，代理将无法正确区分这些文件。

   ```
   { 
      "flows": [
           { 
               "filePattern": "/tmp/app.log*", 
               "deliveryStream": "yourdeliverystream"
           } 
      ] 
   }
   ```

   默认 AWS 区域为`us-east-1`。如果使用的是其他区域，请将 `firehose.endpoint` 设置添加到配置文件，为区域指定终端节点。有关更多信息，请参阅 [指定代理配置设置](agent-config-settings.md)。

1. 手动启动代理：

   ```
   sudo service aws-kinesis-agent start
   ```

1. （可选）将代理配置为在系统启动时启动：

   ```
   sudo chkconfig aws-kinesis-agent on
   ```

现在，代理作为系统服务在后台运行。它会持续监控指定的文件，并将数据发送到指定的 Firehose 流。代理活动记录在 `/var/log/aws-kinesis-agent/aws-kinesis-agent.log` 中。

# 指定代理配置设置
<a name="agent-config-settings"></a>

代理支持两个必需的配置设置，即 `filePattern` 和 `deliveryStream`，以及可用于其他功能的可选配置设置。您可以在 `/etc/aws-kinesis/agent.json` 中指定必需配置设置和可选配置设置。

只要您更改配置文件，就必须使用以下命令停止再启动代理：

```
sudo service aws-kinesis-agent stop
sudo service aws-kinesis-agent start
```

或者，您也可以使用以下命令：

```
sudo service aws-kinesis-agent restart
```

一般的设置配置如下。


| 配置设置 | 说明 | 
| --- | --- | 
| <a name="assumeRoleARN"></a>assumeRoleARN |  用户应承担的角色的 Amazon 资源名称 (ARN)。有关更多信息，请参阅 IAM *用户指南*中的[使用 IAM 角色委派跨 AWS 账户访问权限](https://docs.aws.amazon.com/IAM/latest/UserGuide/tutorial_cross-account-with-roles.html)。  | 
| <a name="assumeRoleExternalId"></a>assumeRoleExternalId |  确定谁可以担任该角色的可选标识符。有关更多信息，请参阅《IAM 用户指南》**中的[如何使用外部 ID](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html)。  | 
| <a name="awsAccessKeyId"></a>awsAccessKeyId |  AWS 覆盖默认凭证的访问密钥 ID。此设置优先于所有其他凭证提供程序。  | 
| <a name="awsSecretAccessKey"></a>awsSecretAccessKey |  AWS 覆盖默认凭证的密钥。此设置优先于所有其他凭证提供程序。  | 
| cloudwatch.emitMetrics |   CloudWatch 如果已设置，则允许代理向其发送指标 (true)。 默认：True  | 
| cloudwatch.endpoint |  的区域终端节点 CloudWatch。 默认值：`monitoring.us-east-1.amazonaws.com`  | 
| firehose.endpoint |  Amazon Data Firehose 的区域端点。 默认值：`firehose.us-east-1.amazonaws.com`  | 
| sts.endpoint |   AWS 安全令牌服务的区域终端节点。 默认值：`https://sts.amazonaws.com`  | 
| userDefinedCredentialsProvider.classname | 如果定义自定义凭证提供程序，请使用此设置提供其完全限定类名。不要在类名末尾包含 .class。 | 
| userDefinedCredentialsProvider.location | 如果定义自定义凭证提供程序，请使用此设置指定包含自定义凭证提供程序的 jar 的绝对路径。代理还在以下位置查找 jar 文件：/usr/share/aws-kinesis-agent/lib/。 | 

流配置设置如下。


| 配置设置 | 说明 | 
| --- | --- | 
| aggregatedRecordSizeBytes |  要使代理聚合记录，然后在一个操作中将这些记录放入 Firehose 流，请指定此设置。将此项设置为所需的大小，您希望聚合记录达到该大小后，代理将记录放入 Firehose 流。 默认值：0（不聚合）  | 
| dataProcessingOptions |  在将每个被分析的记录发送到 Firehose 流之前应用于这些记录的处理选项的列表。这些处理选项按指定的顺序执行。有关更多信息，请参阅 [使用代理对数据进行预处理](pre-processing.md)。  | 
| deliveryStream |  [必需] Firehose 流名称。  | 
| filePattern |  [必需] 需要由代理监控的文件的 Glob。与此模式匹配的任何文件都会自动由代理挑选出来并进行监控。对于匹配此模式的所有文件，请向 `aws-kinesis-agent-user` 授予读取权限。对于包含这些文件的目录，请向 `aws-kinesis-agent-user` 授予读取和执行权限。  代理将选择与此模式匹配的任何文件。为了确保代理不会选择意外的记录，请仔细选择此模式。   | 
| initialPosition |  开始解析文件的初始位置。有效值为 `START_OF_FILE` 和 `END_OF_FILE`。 默认值：`END_OF_FILE`  | 
| maxBufferAgeMillis |  代理在将数据发送到 Firehose 流之前缓冲数据的最长时间（以毫秒计）。 值范围：1,000-900,000（1 秒到 15 分钟） 默认值：60,000（1 分钟）  | 
| maxBufferSizeBytes |  代理在将数据发送到 Firehose 流之前缓冲的数据的最大大小（以字节计）。 值范围：1-4,194,304（4MB） 默认值：4194304 (4 MB)  | 
| maxBufferSizeRecords |  代理在将数据发送到 Firehose 流之前缓冲数据的最大记录数。 值范围：1-500 默认值：500  | 
| minTimeBetweenFilePollsMillis |  代理轮询和分析受监控文件中是否有新数据的时间间隔（以毫秒计）。 值范围：1 或更大值 默认值：100  | 
| multiLineStartPattern |  用于标识记录开始的模式。记录由与模式匹配的行以及与模式不匹配的任何以下行组成。有效值为正则表达式。默认情况下，日志文件中的每一个新行都被解析为一条记录。  | 
| skipHeaderLines |  代理从受监控文件开头跳过分析的行数。 值范围：0 或更大值 默认值：0（零）  | 
| truncatedRecordTerminator |  在记录大小超过 Amazon Data Firehose 记录大小限制时，代理用来截断已解析记录的字符串。(1000 KB) 默认值：`'\n'`（换行符）  | 

# 配置多个文件目录和流
<a name="sim-writes"></a>

通过指定多个流程配置设置，您可以配置代理以监控多个文件目录并将数据发送到多个流。在以下配置示例中，代理监控两个文件目录，并将数据分别发送到 Kinesis 数据流和 Firehose 流。您可以为 Kinesis Data Streams 和 Amazon Data Firehose 指定不同的端点，这样您的数据流和 Firehose 流就不需要位于同一区域。

```
{
    "cloudwatch.emitMetrics": true,
    "kinesis.endpoint": "https://your/kinesis/endpoint", 
    "firehose.endpoint": "https://your/firehose/endpoint", 
    "flows": [
        {
            "filePattern": "/tmp/app1.log*", 
            "kinesisStream": "yourkinesisstream"
        }, 
        {
            "filePattern": "/tmp/app2.log*",
            "deliveryStream": "yourfirehosedeliverystream" 
        }
    ] 
}
```

有关在 Amazon Kinesis Data Streams 中使用代理的更多详细信息，请参阅[使用 Kinesis 代理写入 Amazon Kinesis Data Streams](https://docs.aws.amazon.com/kinesis/latest/dev/writing-with-agents.html)。

# 使用代理对数据进行预处理
<a name="pre-processing"></a>

代理可以预处理从受监控文件分析的记录，然后再将这些记录发送到 Firehose 流。通过将 `dataProcessingOptions` 配置设置添加到您的文件流可以启用此功能。可以添加一个或多个处理选项，这些选项将按指定的顺序执行。

代理支持以下处理选项。由于代理是开源的，您可以进一步开发和扩展其处理选项。您可以从 [Kinesis 代理](https://github.com/awslabs/amazon-kinesis-agent)下载代理。处理选项

`SINGLELINE`  
通过移除换行符和首尾的空格，将多行记录转换为单行记录。  

```
{
    "optionName": "SINGLELINE"
}
```

`CSVTOJSON`  
将记录从分隔符分隔的格式转换为 JSON 格式。  

```
{
    "optionName": "CSVTOJSON",
    "customFieldNames": [ "field1", "field2", ... ],
    "delimiter": "yourdelimiter"
}
```  
`customFieldNames`  
[必需] 在每个 JSON 键值对中用作键的字段名称。例如，如果您指定 `["f1", "f2"]`，则记录“v1, v2”将转换为 `{"f1":"v1","f2":"v2"}`。  
`delimiter`  
在记录中用作分隔符的字符串。默认值为逗号（,）。

`LOGTOJSON`  
将记录从日志格式转换为 JSON 格式。支持的日志格式为 **Apache Common Log**、**Apache Combined Log**、**Apache Error Log** 和 **RFC3164 Syslog**。  

```
{
    "optionName": "LOGTOJSON",
    "logFormat": "logformat",
    "matchPattern": "yourregexpattern",
    "customFieldNames": [ "field1", "field2", … ]
}
```  
`logFormat`  
[必需] 日志条目格式。以下是可能的值：  
+ `COMMONAPACHELOG` – Apache 通用日志格式。默认情况下每个日志条目都为以下模式：“`%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes}`”。
+ `COMBINEDAPACHELOG` – Apache 组合日志格式。默认情况下每个日志条目都为以下模式：“`%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes} %{referrer} %{agent}`”。
+ `APACHEERRORLOG` – Apache 错误日志格式。默认情况下每个日志条目都为以下模式：“`[%{timestamp}] [%{module}:%{severity}] [pid %{processid}:tid %{threadid}] [client: %{client}] %{message}`”。
+ `SYSLOG`— RFC3164 系统日志格式。默认情况下每个日志条目都为以下模式：“`%{timestamp} %{hostname} %{program}[%{processid}]: %{message}`”。  
`matchPattern`  
覆盖指定的日志格式的默认模式。如果日志条目使用自定义格式，则可以使用该设置提取日志条目中的值。如果指定 `matchPattern`，还必须指定 `customFieldNames`。  
`customFieldNames`  
在每个 JSON 键值对中用作键的自定义字段名称。您可以使用此设置定义从 `matchPattern` 中提取的值的字段名称，或覆盖预定义日志格式的默认字段名称。

**Example ：LOGTOJSON 配置**  <a name="example-logtojson"></a>
下面是一个转换为 JSON 格式的 Apache 通用日志条目的 `LOGTOJSON` 配置示例：  

```
{
    "optionName": "LOGTOJSON",
    "logFormat": "COMMONAPACHELOG"
}
```
转换前：  

```
64.242.88.10 - - [07/Mar/2004:16:10:02 -0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291
```
转换后：  

```
{"host":"64.242.88.10","ident":null,"authuser":null,"datetime":"07/Mar/2004:16:10:02 -0800","request":"GET /mailman/listinfo/hsdivision HTTP/1.1","response":"200","bytes":"6291"}
```

**Example ：包含自定义字段的 LOGTOJSON 配置**  <a name="example-logtojson-custom-fields"></a>
下面是 `LOGTOJSON` 配置的另一个示例：  

```
{
    "optionName": "LOGTOJSON",
    "logFormat": "COMMONAPACHELOG",
    "customFieldNames": ["f1", "f2", "f3", "f4", "f5", "f6", "f7"]
}
```
使用此配置设置时，上一个示例中的同一个 Apache 通用日志条目将如下转换为 JSON 格式：  

```
{"f1":"64.242.88.10","f2":null,"f3":null,"f4":"07/Mar/2004:16:10:02 -0800","f5":"GET /mailman/listinfo/hsdivision HTTP/1.1","f6":"200","f7":"6291"}
```

**Example ：转换 Apache 通用日志条目**  <a name="example-apache-common-log-entry"></a>
以下流配置将一个 Apache 通用日志条目转换为 JSON 格式的单行记录：  

```
{ 
    "flows": [
        {
            "filePattern": "/tmp/app.log*", 
            "deliveryStream": "my-delivery-stream",
            "dataProcessingOptions": [
                {
                    "optionName": "LOGTOJSON",
                    "logFormat": "COMMONAPACHELOG"
                }
            ]
        }
    ] 
}
```

**Example ：转换多行记录**  <a name="example-convert-multi-line"></a>
以下流配置分析第一行以“`[SEQUENCE=`”开头的多行记录。每个记录先转换为一个单行记录。然后，将基于制表分隔符从记录中提取值。提取的值映射到指定的 `customFieldNames` 值，从而形成 JSON 格式的单行记录。  

```
{ 
    "flows": [
        {
            "filePattern": "/tmp/app.log*", 
            "deliveryStream": "my-delivery-stream",
            "multiLineStartPattern": "\\[SEQUENCE=",
            "dataProcessingOptions": [
                {
                    "optionName": "SINGLELINE"
                },
                {
                    "optionName": "CSVTOJSON",
                    "customFieldNames": [ "field1", "field2", "field3" ],
                    "delimiter": "\\t"
                }
            ]
        }
    ] 
}
```

**Example ：具有匹配模式的 LOGTOJSON 配置**  <a name="example-logtojson-match-pattern"></a>
下面是一个转换为 JSON 格式的 Apache 通用日志条目的 `LOGTOJSON` 配置示例，其中省略了最后一个字段 (bytes)：  

```
{
    "optionName": "LOGTOJSON",
    "logFormat": "COMMONAPACHELOG",
    "matchPattern": "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3})",
    "customFieldNames": ["host", "ident", "authuser", "datetime", "request", "response"]
}
```
转换前：  

```
123.45.67.89 - - [27/Oct/2000:09:27:09 -0400] "GET /java/javaResources.html HTTP/1.0" 200
```
转换后：  

```
{"host":"123.45.67.89","ident":null,"authuser":null,"datetime":"27/Oct/2000:09:27:09 -0400","request":"GET /java/javaResources.html HTTP/1.0","response":"200"}
```

# 使用常用的代理 CLI 命令
<a name="cli-commands"></a>

下表提供了一组使用 AWS Kinesis 代理的常见用例和相应的命令。


| 使用案例 | 命令 | 
| --- | --- | 
|  系统启动时自动启动代理  |  <pre>sudo chkconfig aws-kinesis-agent on</pre>  | 
|  检查代理的状态  |  <pre>sudo service aws-kinesis-agent status</pre>  | 
|  停止代理  |  <pre>sudo service aws-kinesis-agent stop</pre>  | 
|  从此位置读取代理的日志文件  |  <pre>/var/log/aws-kinesis-agent/aws-kinesis-agent.log</pre>  | 
|  卸载代理  |  <pre>sudo yum remove aws-kinesis-agent</pre>  | 

# 排查从 Kinesis 代理发送时出现的问题
<a name="agent-faq"></a>

此表提供了使用 Amazon Kinesis 代理时所遇到常见问题的问题排查信息和解决方案。


| 问题 | 解决方案 | 
| --- | --- | 
| 为什么 Kinesis 代理无法在 Windows 上运行？ |  [适用于 Windows 的 Kinesis 代理](https://docs.aws.amazon.com/kinesis-agent-windows/latest/userguide/what-is-kinesis-agent-windows.html)与适用于 Linux 平台的 Kinesis 代理是不同的软件。  | 
| 为什么 Kinesis 代理速度变慢且/或 RecordSendErrors 增加？ |  这通常是由于 Kinesis 节流造成的。查看 Kinesis Data Streams 的 `WriteProvisionedThroughputExceeded` 指标或 Firehose 流的 `ThrottledRecords` 指标。这些指标从 0 开始的任何增加都表示需要增加流限制。有关更多信息，请参阅 [Kinesis Data Stream limits](https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html) 和 [Firehose streams](https://docs.aws.amazon.com/firehose/latest/dev/limits.html)。 排除节流后，查看 Kinesis 代理是否配置为跟踪大量小文件。Kinesis 代理跟踪新文件时会有延迟，因此 Kinesis 代理应跟踪少量大文件。尝试将您的日志文件合并为大文件。  | 
| 如何解决 java.lang.OutOfMemoryError 异常？ | 当 Kinesis 代理没有足够的内存来处理当前的工作负载时会发生此情况。尝试增加 /usr/bin/start-aws-kinesis-agent 中的 JAVA\$1START\$1HEAP 和 JAVA\$1MAX\$1HEAP，并重新启动代理。 | 
| 如何解决 IllegalStateException : connection pool shut down 异常？ | Kinesis 代理没有足够的连接来处理当前的工作负载。尝试在 /etc/aws-kinesis/agent.json 的常规代理配置设置中增加 maxConnections 和 maxSendingThreads。这些字段的默认值是可用运行时系统处理器的 12 倍。有关高级代理配置设置的更多信息，请参见 [AgentConfiguration.java](https://github.com/awslabs/amazon-kinesis-agent/blob/master/src/com/amazon/kinesis/streaming/agent/config/AgentConfiguration.java)。 | 
| 如何调试 Kinesis 代理的其他问题？ | 可在 /etc/aws-kinesis/log4j.xml 中启用 DEBUG 级别日志。 | 
| 我应该如何配置 Kinesis 代理？ | maxBufferSizeBytes 越小，Kinesis 代理发送数据的频率就越高。这可能是好事，因为这减少了记录的传输时间，但也增加了每秒对 Kinesis 的请求。 | 
| 为什么 Kinesis 代理会发送重复记录？ | 这是由于文件跟踪中的错误配置造成的。确保每个 fileFlow’s filePattern 只匹配一个文件。如果正在使用的 logrotate 模式处于 copytruncate 模式，也可能发生这种情况。尝试将模式更改为默认模式或创建模式以避免重复。有关处理重复记录的更多信息，请参阅[处理重复记录](https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-duplicates.html)。 | 

# 使用 AWS SDK 发送数据
<a name="writing-with-sdk"></a>

您可以使用 [Amazon Data Firehose API](https://docs.aws.amazon.com/firehose/latest/APIReference/) 通过[适用于 Java](https://aws.amazon.com/sdk-for-java/)、[.NET](https://aws.amazon.com/sdk-for-net/)、[Node.js](https://aws.amazon.com/sdk-for-javascript/)、[Python](https://aws.amazon.com/sdk-for-python/) 或 [Ruby](https://aws.amazon.com/sdk-for-ruby/) 的AWS SDK 将数据发送到 Firehose 流。如果您不了解 Amazon Data Firehose，请花点时间熟悉 [什么是 Amazon Data Firehose？](what-is-this-service.md) 中介绍的概念和术语。有关更多信息，请参阅[开始使用 Amazon Web Services 开发](https://aws.amazon.com/developers/getting-started/)。

这些示例并非可直接用于生产的代码，因为它们不会检查所有可能的异常，或者不会考虑到所有可能的安全或性能问题。

Amazon Data Firehose API 提供了两种用于向你的 Firehose 直播发送数据的操作：和。[PutRecord[PutRecordBatch](https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html)](https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecord.html) `PutRecord()`在一个呼叫中发送一条数据记录，并且`PutRecordBatch()`可以在一个呼叫中发送多条数据记录。

## 使用单次写入操作 PutRecord
<a name="putrecord"></a>

写入数据时，只需要 Firehose 流名称和字节缓冲区（<=1000 KB）。由于 Amazon Data Firehose 在将文件加载到 Amazon S3 之前会批量处理多条记录，因此可能需要添加记录分隔符。要以一次一条记录的方式向 Firehose 流写入数据，请使用以下代码：

```
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setDeliveryStreamName(deliveryStreamName);

String data = line + "\n";

Record record = new Record().withData(ByteBuffer.wrap(data.getBytes()));
putRecordRequest.setRecord(record);

// Put record into the DeliveryStream
firehoseClient.putRecord(putRecordRequest);
```

有关更多代码上下文，请参阅 AWS SDK 中包含的示例代码。有关请求和响应语法的信息，请参阅 [Firehose API Operations](https://docs.aws.amazon.com/firehose/latest/APIReference/API_Operations.html) 中的相关主题。

## 使用 Batch 写入操作 PutRecordBatch
<a name="putrecordbatch"></a>

写入数据时，只需要 Firehose 流名称和记录列表。由于 Amazon Data Firehose 在将文件加载到 Amazon S3 之前会批量处理多条记录，因此可能需要添加记录分隔符。要以批量方式向 Firehose 流写入数据记录，请使用以下代码：

```
PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest();
putRecordBatchRequest.setDeliveryStreamName(deliveryStreamName);
putRecordBatchRequest.setRecords(recordList);

// Put Record Batch records. Max No.Of Records we can put in a
// single put record batch request is 500
firehoseClient.putRecordBatch(putRecordBatchRequest);

recordList.clear();
```

有关更多代码上下文，请参阅 AWS SDK 中包含的示例代码。有关请求和响应语法的信息，请参阅 [Firehose API Operations](https://docs.aws.amazon.com/firehose/latest/APIReference/API_Operations.html) 中的相关主题。

# 向 Firehose 发送 CloudWatch 日志
<a name="writing-with-cloudwatch-logs"></a>

CloudWatch 可以使用 CloudWatch 订阅过滤器将日志事件发送到 Firehose。有关更多信息，请参阅[使用 Amazon Data Firehose 的订阅筛选条件](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#FirehoseExample)。

CloudWatch 日志事件以压缩的 gzip 格式发送到 Firehose。如果您想将解压缩后的日志事件传送到 Firehose 目标，可以使用 Firehose 中的解压缩功能自动解压缩日志。 CloudWatch 

**重要**  
目前，Firehose 不支持将 CloudWatch 日志传送到亚马逊 OpenSearch 服务目标，因为亚马逊 CloudWatch 将多个日志事件合并到一个 Firehose 记录中，而亚马逊 OpenSearch 服务无法在一条记录中接受多个日志事件。作为替代方案，您可以考虑[在 CloudWatch 日志中使用亚马逊 OpenSearch 服务的订阅筛选条件](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_OpenSearch_Stream.html)。

# 解压缩 CloudWatch 日志
<a name="writing-with-cloudwatch-logs-decompression"></a>

[如果您使用 Firehose 传送 CloudWatch 日志，并希望将解压缩后的数据传输到 Firehose 直播目标，请使用 Firehose [数据格式转换](https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html)（Parquet、ORC）或动态分区。](https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html)您必须为 Firehose 流启用解压功能。

您可以使用 AWS 管理控制台、 AWS Command Line Interface 或 AWS SDKs启用解压功能。

**注意**  
如果您在直播上启用解压缩功能，请将该流专门用于 CloudWatch 日志订阅过滤器，而不是用于Vended Logs。如果您在用于同时采集日志和已售 CloudWatch 日志的流上启用解压缩功能，则向Firehose提取Vended Logs将失败。此解压缩功能仅适用于 CloudWatch 日志。

# 解压日志后提取消息 CloudWatch
<a name="Message_extraction"></a>

启用解压缩时，您可以选择同时启用消息提取。使用消息提取时，Firehose 会从解压缩的 CloudWatch 日志记录中筛选出所有元数据，例如所有者、日志组、日志流和其他元数据，并仅提供消息字段内的内容。如果您要将数据传输到 Splunk 目的地，则必须开启消息提取功能，Splunk 才能解析数据。以下是解压缩后的输出示例，无论是否具有消息提取。

图 1：没有消息提取的解压缩后的输出示例：

```
{
 "owner": "111111111111",
 "logGroup": "CloudTrail/logs",
 "logStream": "111111111111_CloudTrail/logs_us-east-1",
 "subscriptionFilters": [
 "Destination"
 ],
 "messageType": "DATA_MESSAGE",
 "logEvents": [
 {
 "id": "31953106606966983378809025079804211143289615424298221568",
 "timestamp": 1432826855000,
 "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root1\"}"
 },
 {
 "id": "31953106606966983378809025079804211143289615424298221569",
 "timestamp": 1432826855000,
 "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root2\"}"
 },
 {
 "id": "31953106606966983378809025079804211143289615424298221570",
 "timestamp": 1432826855000,
 "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root3\"}"
 }
 ]
}
```

图 2：有消息提取的解压缩后的输出示例：

```
{"eventVersion":"1.03","userIdentity":{"type":"Root1"}
{"eventVersion":"1.03","userIdentity":{"type":"Root2"}
{"eventVersion":"1.03","userIdentity":{"type":"Root3"}
```

# 通过控制台对新的 Firehose 流启用解压缩功能
<a name="writing-with-cloudwatch-logs-decompression-enabling-console"></a>

**要对新的 Firehose 直播启用解压功能，请使用 AWS 管理控制台**

1. [登录 AWS 管理控制台 并在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. 在导航窗格中选择 **Amazon Data Firehose**。

1. 选择**创建 Firehose 流**。

1. 在**选择源和目的地**下  
****源****  
您的 Firehose 流的源。请选择下列源之一：  
   + **Direct PUT**：选择此选项可创建 Firehose 流，供生产者应用程序直接写入。有关与 Firehose 中的 Direct PUT 集成的 AWS 服务、代理以及开源服务的列表，请参阅[此](create-name.md)部分。
   + **Kinesis 流**：选择此选项，以配置使用 Kinesis 数据流作为数据来源的 Firehose 流。然后，您可以使用 Firehose 从现有 Kinesis 数据流轻松读取数据，并将其加载到目的地。有关更多信息，请参阅 [Writing to Firehose Using Kinesis Data Streams](https://docs.aws.amazon.com/firehose/latest/dev/writing-with-kinesis-streams.html)  
****目标位置****  
Firehose 流的目的地。选择下列选项之一：  
   + Amazon S3
   + Splunk

1. 在 **Firehose 流名称**下，输入您的流名称。

1. （可选）在**转换记录**下：
   + 在 “**从 Amazon CloudWatch Logs 解压源记录” 部分中，选择**开启解**压缩**。
   + 如果要在解压缩后使用消息提取，则请选择**开启消息提取**。

# 在现有 Firehose 流上启用解压缩功能
<a name="enabling-decompression-existing-stream-console"></a>

本节提供了关于在现有 Firehose 流中启用解压缩的说明。它涵盖了两种场景——禁用 Lambda 处理的流和已经启用 Lambda 处理的流。以下各节概述了每种情况的 step-by-step程序，包括创建或修改 Lambda 函数、更新 Firehose 设置以及监控 CloudWatch 指标，以确保成功实施内置 Firehose 解压缩功能。

## 禁用 Lambda 处理时启用解压缩
<a name="enabling-decomp-exist-stream-lam-disable"></a>

要在禁用 Lambda 处理的情况下对现有 Firehose 流启用解压缩，必须先启用 Lambda 处理。此条件仅对现有流有效。以下步骤说明如何对未启用 Lambda 处理的现有流启用解压缩。

1. 创建一个 Lambda 函数。您可以创建虚拟记录通道，也可以使用此[蓝图](https://github.com/aws-samples/aws-kinesis-firehose-resources/tree/main/blueprints/kinesis-firehose-cloudwatch-logs-processor)创建新的 Lambda 函数。

1. 更新您当前的 Firehose 流以启用 Lambda 处理并使用您创建的 Lambda 函数进行处理。

1. 使用新的 Lambda 函数更新流后，请返回 Firehose 控制台并启用解压缩功能。

1. 禁用您在步骤 1 中启用的 Lambda 处理。现在，您可以删除已在步骤 1 中创建的函数。

## 启用 Lambda 处理时启用解压缩
<a name="enabling-decomp-exist-stream-lam-enable"></a>

如果您已经有带有 Lambda 函数的 Firehose 流，要执行解压缩，可以将其替换为 Firehose 解压缩功能。在继续操作之前，请查看您的 Lambda 函数代码，以确认它仅执行解压缩或消息提取。您的 Lambda 函数的输出应与[图 1 或图 2](Message_extraction.md) 中显示的示例类似。如果输出看起来相似，则您可以使用以下步骤替换 Lambda 函数。

1. 将您当前的 Lambda 函数替换为此[蓝图](https://github.com/aws-samples/aws-kinesis-firehose-resources/tree/main/blueprints/kinesis-firehose-cloudwatch-logs-processor)。新的蓝图 Lambda 函数会自动检测传入的数据是被压缩还是解压缩。它只有在输入数据被压缩时才会执行解压缩。

1. 使用内置的 Firehose 选项开启解压缩功能，以进行解压缩。

1. 如果您的 Firehose 直播尚未启用，请启用该 CloudWatch 指标。监控指标 `CloudWatchProcessorLambda_IncomingCompressedData` 并等待至该指标变为零。这将确认发送到您的 Lambda 函数的所有输入数据均已解压缩，并且不再需要 Lambda 函数。

1. 移除 Lambda 数据转换，因为您不再需要此项来解压缩您的流。

# 在 Firehose 流中禁用解压缩
<a name="writing-with-cloudwatch-logs-decompression-disabling-console"></a>

****

要禁用对数据流的解压缩，请使用 AWS 管理控制台

1. [登录 AWS 管理控制台 并在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. 在导航窗格中选择 **Amazon Data Firehose**。

1. 选择您要编辑的 Firehose 流。

1. 在 **Firehose 流详细信息**页面上，选择**配置**选项卡。

1. 在**转换记录**部分，选择**编辑**。

1. 在 “**从 Amazon CloudWatch Logs 解压源记录**” 下，清除 “**开启解压缩**”，然后选择 “**保存**更改”。

# 对 Firehose 中的解压缩进行问题排查
<a name="decomp-faq"></a>

下表显示了 Firehose 如何处理数据解压缩和处理期间的错误，包括将记录传输到 S3 错误存储桶、日志记录错误和发出指标。其中还解释了未经授权的数据放置操作返回的错误消息。


| 问题 | 解决方案 | 
| --- | --- | 
| 如果在解压缩过程中出现错误，源数据会发生什么？ |  如果 Amazon Data Firehose 无法解压缩记录，则记录将按原样（以压缩格式）传输到您在 Firehose 流创建期间指定的 S3 错误存储桶。除了记录外，传输的对象还包括错误代码和错误消息，这些对象将传输到名为 `decompression-failed` 的 S3 存储桶前缀。记录解压缩失败后，Firehose 将继续处理其他记录。  | 
| 成功解压缩后，如果处理管道出现错误，源数据会发生什么？ |  如果 Amazon Data Firehose 在解压后的处理步骤（例如，动态分区和数据格式转换）中出错，则记录将以压缩格式传输到您在创建 Firehose 流时指定的 S3 错误存储桶。除了记录外，传输的对象还包括错误代码和错误消息。  | 
| 如果出现错误或异常，要如何通知您？ |  如果在解压缩过程中出现错误或异常，如果您配置 CloudWatch 日志，Firehose 会将错误消息 CloudWatch 记录到日志中。此外，Firehose 还会向您可以 CloudWatch 监控的指标发送指标。您还可以选择根据 Firehose 发出的指标来创建警报。  | 
| 当put操作不是来自 CloudWatch 日志时会发生什么？ | 如果客户不是puts来自 CloudWatch Logs，则会返回以下错误消息：<pre>Put to Firehose failed for AccountId: <accountID>, FirehoseName:  <firehosename> because the request is not originating from allowed source types.</pre> | 
| Firehose 为解压缩功能发出了哪些指标？ | Firehose 发出每条记录的解压缩指标。您应该选择周期（1 分钟）、统计数据（总和）、日期范围，以获取 DecompressedRecords 失败或成功或 DecompressedBytes 失败或成功的次数。有关更多信息，请参阅 [CloudWatch 日志解压缩指标](monitoring-with-cloudwatch-metrics.md#decompression-metrics-cw)。 | 

# 向 Fire CloudWatch hose 发送事件
<a name="writing-with-cloudwatch-events"></a>

您可以通过 CloudWatch 向事件规则添加目标，将亚马逊配置为向 Firehose 流发送 CloudWatch 事件。

**为向现有 Firehose 直播发送事件的事件规则创建目标 CloudWatch**

1. 登录 AWS 管理控制台 并打开 CloudWatch 控制台，网址为[https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/)。

1. 选择 **Create rule**（创建规则）。

1. 在**步骤 1：创建规则**页面上，对于**目标**，选择**添加目标**，然后选择 **Firehose 流**。

1. 选择现有的 **Firehose 流**。

有关创建 CloudWatch 事件规则的更多信息，请参阅 [Amazon CloudWatch 事件入门](https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/CWE_GettingStarted.html)。

# 配置 AWS IoT 为向 Firehose 发送数据
<a name="writing-with-iot"></a>

您可以通过添加操作 AWS IoT 将信息配置为向 Firehose 直播发送信息。

**要创建向现有 Firehose 流发送事件的操作**

1. 在 AWS IoT 控制台中创建规则时，在**创建规则**页面的**设置一个或多个操作下，选择**添加操作****。

1. 选择**将消息发送到 Amazon Kinesis Firehose 流**。

1. 选择 **Configure action**。

1. 对于**流名称**，选择现有的 Firehose 流。

1. 对于 **Separator**，选择要在记录之间插入的分隔符字符。

1. 对于 **IAM 角色**，选择现有的 IAM 角色，或选择**创建新角色**。

1. 选择**添加操作**。

有关创建 AWS IoT 规则的更多信息，请参阅 [AWS IoT 规则教程](https://docs.aws.amazon.com/iot/latest/developerguide/iot-rules-tutorial.html)。