本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Kinesis 代理写入 Amazon Kinesis Data Streams
Kinesis 代理是独立的 Java 软件应用程序,可提供更轻松的方式来收集数据并将数据发送到 Kinesis Data Streams。此代理持续监控一组文件,并将新数据发送到您的流。代理处理文件轮换、检查点操作并在失败时重试。它以可靠及时的简单方法提供您的所有数据。它还会发出 Amazon CloudWatch 指标,以帮助您更好地监控流处理并排除问题。
默认情况下,会基于换行符 ('\n'
) 分析每个文件中的记录。但是,也可以将代理配置为分析多行记录(请参阅 指定代理配置设置)。
您可以在基于 Linux 的服务器环境(如 Web 服务器、日志服务器和数据库服务器)上安装此代理。在安装代理后,通过指定要监控的日志文件和 名称来配置代理。在配置好代理之后,代理将持续从这些文件中收集数据并以可靠的方式将数据发送到流。
完成 Kinesis 代理的先决条件
-
您的操作系统必须是 Amazon Linux AMI 版本 2015.09 或更高版本,或者 Red Hat Enterprise Linux 版本 7 或更高版本。
-
如果您使用 Amazon EC2 运行代理,请启动 EC2 实例。
-
使用以下方法之一管理 AWS 凭证:
-
当您启动您的 EC2 实例时指定该 IAM 角色。
-
当您配置代理时指定 AWS 凭证(请参阅 awsAccessKeyId 和 awsSecretAccessKey)。
-
编辑
/etc/sysconfig/aws-kinesis-agent
以指定区域和 AWS 访问密钥。 -
如果 EC2 实例位于其他 AWS 账户中,请创建一个 IAM 角色来提供对 Kinesis Data Streams 服务的访问,并在配置代理时指定该角色(请参阅 assumeRoleARN 和 assumeRoleExternalId)。使用上述方法之一指定其他账户中有权担任该角色的用户的 AWS 凭证。
-
-
您指定的 IAM 角色或 AWS 凭证必须有权执行 Kinesis Data Streams PutRecords 操作,代理才能将数据发送到流。如果您为代理启用 CloudWatch 监控,则还需要拥有执行 CloudWatch PutMetricData 操作的权限。有关更多信息,请参阅 使用控制对 Amazon Kinesis Data Streams 资源的访问权限 IAM、使用亚马逊监控 Kinesis Data Streams 代理的运行状况 CloudWatch 和 CloudWatch 访问控制。
下载并安装代理
首先,连接到您的实例。有关详细信息,请参阅《Amazon EC2 用户指南》中的连接到您的实例。如果您在连接时遇到问题,请参阅《Amazon EC2 用户指南》中的排查 Amazon EC2 Linux 实例的连接问题。
使用 Amazon Linux AMI 设置代理
使用以下命令下载和安装代理:
sudo yum install –y aws-kinesis-agent
使用 Red Hat Enterprise Linux 设置代理
使用以下命令下载和安装代理:
sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn2.noarch.rpm
使用 GitHub 设置代理
-
从 awlabs/amazon-kinesis-agent
下载代理。 -
导航到下载目录并运行以下命令来安装代理:
sudo ./setup --install
在 Docker 容器中设置代理
Kinesis 代理可以在容器中运行,也可以通过 amazonlinux 容器库运行。使用以下 Dockerfile,然后运行 docker build
。
FROM amazonlinux RUN yum install -y aws-kinesis-agent which findutils COPY agent.json /etc/aws-kinesis/agent.json CMD ["start-aws-kinesis-agent"]
配置并启动代理
配置并启动代理
-
打开并编辑配置文件(如果使用默认文件访问权限,则以超级用户的身份来执行):
/etc/aws-kinesis/agent.json
在此配置文件中,指定代理从中收集数据的文件 (
"filePattern"
) 以及代理将数据发送到的流的名称 ("kinesisStream"
)。请注意,文件名是一种模式,并且代理能够识别文件轮换。每秒内您轮换使用文件或创建新文件的次数不能超过一次。代理使用文件创建时间戳来确定要跟踪并送入您的流中的文件;如果每秒创建新文件或轮换使用文件的次数超过一次,代理将无法正确区分这些文件。{ "flows": [ { "filePattern": "
/tmp/app.log*
", "kinesisStream": "yourkinesisstream
" } ] } -
手动启动代理:
sudo service aws-kinesis-agent start
-
(可选)将代理配置为在系统启动时启动:
sudo chkconfig aws-kinesis-agent on
现在,代理作为系统服务在后台运行。它会持续监控指定的文件,并将数据发送到指定的流。代理活动记录在 /var/log/aws-kinesis-agent/aws-kinesis-agent.log
中。
指定代理配置设置
代理支持两个必需的配置设置,即 filePattern
和 kinesisStream
,以及可用于其他功能的可选配置设置。您可以在 /etc/aws-kinesis/agent.json
中指定必需配置和可选配置。
只要您更改配置文件,就必须使用以下命令停止再启动代理:
sudo service aws-kinesis-agent stop sudo service aws-kinesis-agent start
或者,您也可以使用以下命令:
sudo service aws-kinesis-agent restart
一般的设置配置如下。
配置设置 | 描述 |
---|---|
assumeRoleARN |
由该用户担任的角色的 ARN。有关更多信息,请参阅《IAM 用户指南》中的使用 IAM 角色委托跨 AWS 账户的访问权限。 |
assumeRoleExternalId |
确定谁可以担任该角色的可选标识符。有关更多信息,请参阅《IAM 用户指南》中的如何使用外部 ID。 |
awsAccessKeyId |
覆盖默认凭证的 AWS 访问密钥 ID。此设置优先于所有其他凭证提供程序。 |
awsSecretAccessKey |
覆盖默认凭证的 AWS 密钥。此设置优先于所有其他凭证提供程序。 |
cloudwatch.emitMetrics |
如果设置为 true,则允许代理向 CloudWatch 发出指标。 默认:True |
cloudwatch.endpoint |
CloudWatch 的区域端点。 默认: |
kinesis.endpoint |
Kinesis Data Streams 的区域端点。 默认: |
流配置设置如下。
配置设置 | 描述 |
---|---|
dataProcessingOptions |
在将每个被分析的记录发送到流之前应用于这些记录的处理选项的列表。这些处理选项按指定的顺序执行。有关更多信息,请参阅 使用代理预处理数据。 |
kinesisStream |
[必需] 流名称。 |
filePattern |
[必需] 目录以及必须与之匹配的文件模式,以便被代理获取。对于与此模式匹配的所有文件,必须向 |
initialPosition |
开始解析文件的初始位置。有效值为 默认: |
maxBufferAgeMillis |
代理在将数据发送到流之前缓冲数据的最长时间(以毫秒计)。 值范围:1000 到 900000(1 秒到 15 分钟) 默认值:60,000(1 分钟) |
maxBufferSizeBytes |
代理在将数据发送到流之前缓冲的数据的最大大小(以字节计)。 值范围:1 到 4194304 (4 MB) 默认值:4194304 (4 MB) |
maxBufferSizeRecords |
代理在将数据发送到流之前缓冲数据的最大记录数。 值范围:1 到 500 默认值:500 |
minTimeBetweenFilePollsMillis |
代理轮询和分析受监控文件中是否有新数据的时间间隔(以毫秒计)。 值范围:1 或更大值 默认值:100 |
multiLineStartPattern |
用于标识记录开始的模式。记录由与模式匹配的行以及与模式不匹配的任何以下行组成。有效值为正则表达式。默认情况下,日志文件中的每一个新行都被解析为一条记录。 |
partitionKeyOption |
生成分区键的方法。有效值为 默认: |
skipHeaderLines |
代理从受监控文件开头跳过分析的行数。 值范围:0 或更大值 默认值:0(零) |
truncatedRecordTerminator |
代理在记录大小超过 Kinesis Data Streams 记录大小限制时用来截断已解析记录的字符串。(1000 KB) 默认值: |
监控多个文件目录并写入多个流
通过指定多个流程配置设置,您可以配置代理以监控多个文件目录并将数据发送到多个流。在以下配置示例中,代理监控两个文件目录,并将数据分别发送到 Kinesis 流和 Firehose 传输流。请注意,您可以为 Kinesis Data Streams 和 Firehose 指定不同的端点,这样您的 Kinesis 流和 Firehose 传输流就不需要位于同一区域。
{ "cloudwatch.emitMetrics":
true
, "kinesis.endpoint": "https://your/kinesis/endpoint
", "firehose.endpoint": "https://your/firehose/endpoint
", "flows": [ { "filePattern": "/tmp/app1.log*
", "kinesisStream": "yourkinesisstream
" }, { "filePattern": "/tmp/app2.log*
", "deliveryStream": "yourfirehosedeliverystream
" } ] }
有关将代理与 Firehose 结合使用的更多详细信息,请参阅 Writing to Amazon Data Firehose with Kinesis Agent。
使用代理预处理数据
代理可以预处理从受监控文件分析的记录,然后再将这些记录发送到流。通过将 dataProcessingOptions
配置设置添加到您的文件流可以启用此功能。可以添加一个或多个处理选项,这些选项将按指定的顺序执行。
代理支持下面列出的处理选项。由于此代理是开源的,您可以进一步开发和扩展其处理选项。您可以从 Kinesis 代理
处理选项
SINGLELINE
-
通过移除换行符和首尾的空格,将多行记录转换为单行记录。
{ "optionName": "SINGLELINE" }
CSVTOJSON
-
将记录从分隔符分隔的格式转换为 JSON 格式。
{ "optionName": "CSVTOJSON", "customFieldNames": [ "
field1
", "field2
",...
], "delimiter": "yourdelimiter
" }customFieldNames
-
[必需] 在每个 JSON 键值对中用作键的字段名称。例如,如果您指定
["f1", "f2"]
,则记录“v1, v2”将转换为{"f1":"v1","f2":"v2"}
。 delimiter
-
在记录中用作分隔符的字符串。默认值为逗号(,)。
LOGTOJSON
-
将记录从日志格式转换为 JSON 格式。支持的日志格式为 Apache Common Log、Apache Combined Log、Apache Error Log 和 RFC3164 Syslog。
{ "optionName": "LOGTOJSON", "logFormat": "
logformat
", "matchPattern": "yourregexpattern
", "customFieldNames": [ "field1
", "field2
",…
] }logFormat
-
[必需] 日志条目格式。以下是可能的值:
-
COMMONAPACHELOG
– Apache 通用日志格式。默认情况下每个日志条目都为以下模式:“%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes}
”。 -
COMBINEDAPACHELOG
– Apache 组合日志格式。默认情况下每个日志条目都为以下模式:“%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes} %{referrer} %{agent}
”。 -
APACHEERRORLOG
– Apache 错误日志格式。默认情况下每个日志条目都为以下模式:“[%{timestamp}] [%{module}:%{severity}] [pid %{processid}:tid %{threadid}] [client: %{client}] %{message}
”。 -
SYSLOG
– RFC3164 系统日志格式。默认情况下每个日志条目都为以下模式:“%{timestamp} %{hostname} %{program}[%{processid}]: %{message}
”。
-
matchPattern
-
用于从日志条目中提取值的正则表达式模式。如果您的日志条目不属于其中一种预定义日志格式,则将使用此设置。如果使用此设置,您还必须指定
customFieldNames
。 customFieldNames
-
在每个 JSON 键值对中用作键的自定义字段名称。您可以使用此设置定义从
matchPattern
中提取的值的字段名称,或覆盖预定义日志格式的默认字段名称。
例 :LOGTOJSON 配置
下面是一个转换为 JSON 格式的 Apache 通用日志条目的 LOGTOJSON
配置示例:
{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" }
转换前:
64.242.88.10 - - [07/Mar/2004:16:10:02 -0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291
转换后:
{"host":"64.242.88.10","ident":null,"authuser":null,"datetime":"07/Mar/2004:16:10:02 -0800","request":"GET /mailman/listinfo/hsdivision HTTP/1.1","response":"200","bytes":"6291"}
例 :包含自定义字段的 LOGTOJSON 配置
下面是 LOGTOJSON
配置的另一个示例:
{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "customFieldNames": ["f1", "f2", "f3", "f4", "f5", "f6", "f7"] }
使用此配置设置时,上一个示例中的同一个 Apache 通用日志条目将如下转换为 JSON 格式:
{"f1":"64.242.88.10","f2":null,"f3":null,"f4":"07/Mar/2004:16:10:02 -0800","f5":"GET /mailman/listinfo/hsdivision HTTP/1.1","f6":"200","f7":"6291"}
例 :转换 Apache 通用日志条目
以下流配置将一个 Apache 通用日志条目转换为 JSON 格式的单行记录:
{ "flows": [ { "filePattern": "
/tmp/app.log*
", "kinesisStream": "my-stream
", "dataProcessingOptions": [ { "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" } ] } ] }
例 :转换多行记录
以下流配置分析第一行以“[SEQUENCE=
”开头的多行记录。每个记录先转换为一个单行记录。然后,将基于制表分隔符从记录中提取值。提取的值映射到指定的 customFieldNames
值,从而形成 JSON 格式的单行记录。
{ "flows": [ { "filePattern": "
/tmp/app.log*
", "kinesisStream": "my-stream
", "multiLineStartPattern": "\\[SEQUENCE=
", "dataProcessingOptions": [ { "optionName": "SINGLELINE" }, { "optionName": "CSVTOJSON", "customFieldNames": [ "field1
", "field2
", "field3
" ], "delimiter": "\\t
" } ] } ] }
例 :具有匹配模式的 LOGTOJSON 配置
下面是一个转换为 JSON 格式的 Apache 通用日志条目的 LOGTOJSON
配置示例,其中省略了最后一个字段 (bytes):
{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "matchPattern": "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3})", "customFieldNames": ["host", "ident", "authuser", "datetime", "request", "response"] }
转换前:
123.45.67.89 - - [27/Oct/2000:09:27:09 -0400] "GET /java/javaResources.html HTTP/1.0" 200
转换后:
{"host":"123.45.67.89","ident":null,"authuser":null,"datetime":"27/Oct/2000:09:27:09 -0400","request":"GET /java/javaResources.html HTTP/1.0","response":"200"}
使用代理 CLI 命令
系统启动时自动启动代理:
sudo chkconfig aws-kinesis-agent on
检查代理的状态:
sudo service aws-kinesis-agent status
停止代理:
sudo service aws-kinesis-agent stop
从此位置读取代理的日志文件:
/var/log/aws-kinesis-agent/aws-kinesis-agent.log
卸载代理:
sudo yum remove aws-kinesis-agent
常见问题解答
有没有适用于 Windows 的 Kinesis 代理?
适用于 Windows 的 Kinesis 代理与适用于 Linux 平台的 Kinesis 代理是不同的软件。
为什么 Kinesis 代理速度变慢且/或 RecordSendErrors
增加?
这通常是由于 Kinesis 节流造成的。查看 Kinesis Data Streams 的 WriteProvisionedThroughputExceeded
指标或 Firehose 传输流的 ThrottledRecords
指标。这些指标从 0 开始的任何增加都表示需要增加流限制。有关更多信息,请参阅 Kinesis Data Stream limits 和 Amazon Firehose Delivery Streams。
排除节流后,查看 Kinesis 代理是否配置为跟踪大量小文件。Kinesis 代理跟踪新文件时会有延迟,因此 Kinesis 代理应跟踪少量大文件。尝试将您的日志文件合并为大文件。
为什么我会收到 java.lang.OutOfMemoryError
异常?
Kinesis 代理没有足够的内存来处理当前的工作负载。尝试增加 /usr/bin/start-aws-kinesis-agent
中的 JAVA_START_HEAP
和 JAVA_MAX_HEAP
,并重新启动代理。
为什么我会收到 IllegalStateException : connection pool shut down
异常?
Kinesis 代理没有足够的连接来处理当前的工作负载。尝试在 /etc/aws-kinesis/agent.json
的常规代理配置设置中增加 maxConnections
和 maxSendingThreads
。这些字段的默认值是可用运行时系统处理器的 12 倍。有关高级代理配置设置的更多信息,请参阅 AgentConfiguration.java
如何调试 Kinesis 代理的其他问题?
可在 /etc/aws-kinesis/log4j.xml
中启用 DEBUG
级别日志。
我应该如何配置 Kinesis 代理?
maxBufferSizeBytes
越小,Kinesis 代理发送数据的频率就越高。这可能是好事,因为这减少了记录的传输时间,但也增加了每秒对 Kinesis 的请求。
为什么 Kinesis 代理会发送重复记录?
这是由于文件跟踪中的错误配置造成的。确保每个 fileFlow’s filePattern
只匹配一个文件。如果正在使用的 logrotate
模式处于 copytruncate
模式,也可能发生这种情况。尝试将模式更改为默认模式或创建模式以避免重复。有关处理重复记录的更多信息,请参阅处理重复记录。