翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Kinesis エージェントを使用して Amazon Kinesis Data Streams に書き込む
Kinesis エージェントはスタンドアロンの Java ソフトウェアアプリケーションであり、データを収集して Kinesis Data Streams に送信する簡単な方法です。このエージェントは一連のファイルを継続的に監視し、新しいデータをストリームに送信します。エージェントはファイルのローテーション、チェックポイント、失敗時の再試行を処理します。タイムリーで信頼性の高い簡単な方法で、すべてのデータを提供します。また、ストリーミング処理のモニタリングとトラブルシューティングに役立つ Amazon CloudWatch メトリクスも出力します。
デフォルトでは、レコードは改行文字 ('\n'
) に基づいて各ファイルから解析されます。しかし、複数行レコードを解析するよう、エージェントを設定することもできます (エージェント設定を指定するを参照)。
このエージェントは、ウェブサーバー、ログサーバーおよびデータベースサーバーなど、Linux ベースのサーバー環境にインストールできます。エージェントをインストールした後で、モニタリング用のファイルとデータストリームを指定して設定します。エージェントが設定されると、ファイルから永続的にデータを収集して、ストリームに安全にデータを送信します。
トピック
Kinesis Agent の前提条件を完了する
-
オペレーティングシステムは Amazon Linux AMI バージョン 2015.09 以降、または Red Hat Enterprise Linux バージョン 7 以降でなければなりません。
-
Amazon EC2 を使用してエージェントを実行している場合は、EC2 インスタンスを起動します。
-
次のいずれかの方法を使用して AWS 認証情報を管理します。
-
EC2 インスタンスを起動する際に IAM ロールを指定します。
-
エージェントを設定するときに AWS 認証情報を指定します (awsAccessKeyId と awsSecretAccessKey を参照)。
-
を編集
/etc/sysconfig/aws-kinesis-agent
して、リージョンと AWS アクセスキーを指定します。 -
EC2 インスタンスが別の AWS アカウントにある場合は、Kinesis Data Streams サービスへのアクセスを提供する IAM ロールを作成し、エージェントを設定するときにそのロールを指定します (assumeRoleARN」とassumeRoleExternalId」を参照)。前述の方法のいずれかを使用して、このロールを引き受けるアクセス許可を持つ他のアカウントのユーザーの AWS 認証情報を指定します。
-
-
指定する IAM ロールまたは AWS 認証情報には、エージェントがストリームにデータを送信するために Kinesis Data Streams PutRecords オペレーションを実行するアクセス許可が必要です。エージェントの CloudWatch モニタリングを有効にしている場合は、CloudWatch PutMetricData オペレーションを実行する許可も必要になります。詳細については、IAM を使用した Amazon Kinesis Data Streams リソースへのアクセスの制御、Amazon CloudWatch による Kinesis Data Streams エージェントのヘルスに監視する、およびCloudWatch アクセスコントロールを参照してください。
エージェントをダウンロードしてインストールする
最初に、インスタンスに接続します。詳細については、「Amazon EC2 ユーザーガイド」の「Linux インスタンスへの接続」を参照してください。接続できない場合は、「Amazon EC2 ユーザーガイド」の「インスタンスへの接続に関するトラブルシューティング」を参照してください。
Amazon Linux AMI を使用してエージェントを設定する
次のコマンドを使用して、エージェントをダウンロードしてインストールします。
sudo yum install –y aws-kinesis-agent
Red Hat Enterprise Linux を使用してエージェントを設定する
次のコマンドを使用して、エージェントをダウンロードしてインストールします。
sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn2.noarch.rpm
GitHubを使用してエージェントを設定する
-
エージェントを awlabs/amazon-kinesis-agent
からダウンロードします。 -
ダウンロードしたディレクトリまで移動し、次のコマンドを実行してエージェントをインストールします。
sudo ./setup --install
Docker コンテナにエージェントをセットアップするには
Kinesis Agent は、amazonlinux コンテナベースを使ってコンテナで実行することもできます。次の Docker ファイルを使用し、docker build
を実行します。
FROM amazonlinux RUN yum install -y aws-kinesis-agent which findutils COPY agent.json /etc/aws-kinesis/agent.json CMD ["start-aws-kinesis-agent"]
エージェントを設定して開始する
エージェントを設定して開始するには
-
(デフォルトのファイルアクセス許可を使用している場合、スーパーユーザーとして) 設定ファイル (
/etc/aws-kinesis/agent.json
) を開き、編集します。この設定ファイルで、エージェントがデータを集めるファイル (
"filePattern"
) とエージェントがデータを送信するストリーム ("kinesisStream"
) を指定します。ファイル名はパターンで、エージェントはファイルローテーションを確認する点に注意してください。1 秒あたりに一度だけ、ファイルを交替または新しいファイルを作成できます。エージェントはファイル作成タイムスタンプを使用して、どのファイルを追跡してストリームに送信するかを判断します。新規ファイルの作成やファイルの交換を 1 秒あたりに一度以上頻繁に交換すると、エージェントはそれらを適切に区別できません。{ "flows": [ { "filePattern": "
/tmp/app.log*
", "kinesisStream": "yourkinesisstream
" } ] } -
エージェントを手動で開始する:
sudo service aws-kinesis-agent start
-
(オプション) システムスタートアップ時にエージェントを開始するように設定します。
sudo chkconfig aws-kinesis-agent on
エージェントは、システムのサービスとしてバックグラウンドで実行されます。継続的に指定ファイルをモニタリングし、指定されたストリームにデータを送信します。エージェント活動は、/var/log/aws-kinesis-agent/aws-kinesis-agent.log
に記録されます。
エージェント設定を指定する
エージェントは、2つの必須設定、filePattern
と kinesisStream
、さらに追加機能として任意の設定をサポートしています。必須およびオプションの設定を /etc/aws-kinesis/agent.json
で指定できます。
設定ファイルを変更した場合は、次のコマンドを使用してエージェントを停止および起動する必要があります。
sudo service aws-kinesis-agent stop sudo service aws-kinesis-agent start
または、次のコマンドを使用できます。
sudo service aws-kinesis-agent restart
全般設定は次のとおりです。
構成設定 | 説明 |
---|---|
assumeRoleARN |
ユーザーが引き受けるロールの ARN。詳細については、「IAM ユーザーガイド」の「IAM ロールを使用した AWS アカウント間のアクセスの委任」を参照してください。 |
assumeRoleExternalId |
ロールを引き受けることができるユーザーを決定するオプションの ID。詳細については、IAM ユーザーガイドの外部 ID の使用方法を参照してください。 |
awsAccessKeyId |
AWS デフォルトの認証情報を上書きする アクセスキー ID。この設定は、他のすべての認証情報プロバイダーに優先されます。 |
awsSecretAccessKey |
AWS デフォルトの認証情報を上書きする シークレットキー。この設定は、他のすべての認証情報プロバイダーに優先されます。 |
cloudwatch.emitMetrics |
エージェントがメトリクスを CloudWatch に出力できるようにします (true に設定された場合)。 デフォルト: true |
cloudwatch.endpoint |
CloudWatch のリージョンのエンドポイントです。 デフォルト: |
kinesis.endpoint |
Kinesis Data Streams のリージョンのエンドポイントです。 デフォルト: |
フロー設定は次のとおりです。
構成設定 | 説明 |
---|---|
dataProcessingOptions |
ストリームに送信される前に解析された各レコードに適用されるの処理オプションの一覧。処理オプションは指定した順序で実行されます。詳細については、エージェントを使用してデータを事前処理するを参照してください。 |
kinesisStream |
[必須] ストリームの名前。 |
filePattern |
[必須] エージェントによって取得されるために一致する必要があるディレクトリとファイルパターン。このパターンに一致するすべてのファイルは、読み取り権限を |
initialPosition |
ファイルの解析が開始される最初の位置。有効な値は、 デフォルト: |
maxBufferAgeMillis |
エージェントがストリームに送信する前にデータをバッファーする最大時間 (ミリ秒)。 値の範囲: 1,000~900,000 (1 秒~15 分) デフォルト: 60,000 (1 分) |
maxBufferSizeBytes |
エージェントがストリームに送信する前にデータをバッファーする最大サイズ (バイト)。 値の範囲: 1~4,194,304 (4 MB) デフォルト: 4,194,304 (4 MB) |
maxBufferSizeRecords |
エージェントがストリームに送信する前にデータをバッファーするレコードの最大数。 値の範囲: 1 ~ 500 デフォルト: 500 |
minTimeBetweenFilePollsMillis |
エージェントが新しいデータのモニタリング対象ファイルをポーリングし、解析する時間間隔 (ミリ秒単位)。 値の範囲: 1 以上 デフォルト: 100 |
multiLineStartPattern |
レコードの開始を識別するパターン。レコードは、パターンに一致する 1 行と、それに続くパターンに一致しない行で構成されます。有効な値は正規表現です。デフォルトでは、ログファイルのそれぞれの改行は 1 つのレコードとして解析されます。 |
partitionKeyOption |
パーティションのキーを生成する方法。有効な値は デフォルト: |
skipHeaderLines |
モニタリング対象ファイルの始めにエージェントが解析をスキップするの行数。 値の範囲: 0 以上 デフォルト: 0 (ゼロ) |
truncatedRecordTerminator |
レコードのサイズが Kinesis Data Streams レコードの許容サイズを超えたときに解析されたレコードを切り捨てるために、エージェントが使用する文字列。(1,000 KB) デフォルト: |
複数のファイルディレクトリを監視し、複数のストリームに書き込む
複数のフロー設定を指定することによって、エージェントが複数のファイルディレクトリを監視し、複数のストリームにデータを送信するように設定できます。以下の設定例では、エージェントは 2 つのファイルディレクトリをモニタリングし、それぞれ Kinesis ストリームおよび Firehose 配信ストリームにデータを送信します。Kinesis Data Streams と Firehose に異なるエンドポイントを指定できるため、Kinesis ストリームと Firehose 配信ストリームが同じリージョンに存在する必要はありません。
{ "cloudwatch.emitMetrics":
true
, "kinesis.endpoint": "https://your/kinesis/endpoint
", "firehose.endpoint": "https://your/firehose/endpoint
", "flows": [ { "filePattern": "/tmp/app1.log*
", "kinesisStream": "yourkinesisstream
" }, { "filePattern": "/tmp/app2.log*
", "deliveryStream": "yourfirehosedeliverystream
" } ] }
Firehose でのエージェントの使用の詳細については、「Kinesis エージェントを使用した Amazon Data Firehose への書き込み」を参照してください。
エージェントを使用してデータを事前処理する
エージェントはストリームにレコードを送信する前に、モニタリング対象ファイルから解析したレコードを事前処理できます。ファイルフローに dataProcessingOptions
設定を追加することで、この機能を有効にできます。1 つ以上の処理オプションを追加でき、また指定されている順序で実行されます。
エージェントは、リストされた次の処理オプションに対応しています。エージェントはオープンソースであるため、処理オプションを開発および拡張できます。Kinesis エージェント
処理オプション
SINGLELINE
-
改行文字、先頭のスペース、末尾のスペースを削除することで、複数行レコードを単一行レコードに変換します。
{ "optionName": "SINGLELINE" }
CSVTOJSON
-
区切り形式から JSON 形式にレコードを変換します。
{ "optionName": "CSVTOJSON", "customFieldNames": [ "
field1
", "field2
",...
], "delimiter": "yourdelimiter
" }customFieldNames
-
[必須] 各 JSON キー値のペアでキーとして使用されるフィールド名。たとえば、
["f1", "f2"]
を指定した場合は、レコードv1、v2は{"f1":"v1","f2":"v2"}
に変換されます。 delimiter
-
レコードで区切り記号として使用する文字列。デフォルトはカンマ (,) です。
LOGTOJSON
-
ログ形式から JSON 形式にレコードを変換します。サポートされているログ形式は、Apache Common Log、Apache Combined Log、Apache Error Log、および RFC3164 Syslog です。
{ "optionName": "LOGTOJSON", "logFormat": "
logformat
", "matchPattern": "yourregexpattern
", "customFieldNames": [ "field1
", "field2
",…
] }logFormat
-
[必須] ログエントリ形式。以下の値を指定できます。
-
COMMONAPACHELOG
— Apache Common Log 形式。各ログエントリは、デフォルトで次のパターン%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes}
になります。 -
COMBINEDAPACHELOG
— Apache Combined Log 形式。各ログエントリは、デフォルトで次のパターン%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes} %{referrer} %{agent}
になります。 -
APACHEERRORLOG
— Apache Error Log 形式。各ログエントリは、デフォルトで次のパターン[%{timestamp}] [%{module}:%{severity}] [pid %{processid}:tid %{threadid}] [client: %{client}] %{message}
になります。 -
SYSLOG
— FC3164 Syslog 形式。各ログエントリは、デフォルトで次のパターン%{timestamp} %{hostname} %{program}[%{processid}]: %{message}
になります。
-
matchPattern
-
ログエントリから値を取得するために使用する正規表現パターン。この設定は、ログエントリが定義されたログ形式の一つとして存在していない場合に使用されます。この設定を使用する場合は、
customFieldNames
を指定する必要があります。 customFieldNames
-
JSON キー値のペアでキーとして使用されるカスタムフィールド名。
matchPattern
から抽出した値のフィールド名を定義するために、または事前定義されたログ形式のデフォルトのフィールド名を上書きするために、この設定を使用できます。
例 : LOGTOJSON 設定
JSON形式に変換された Apache Common Log エントリの LOGTOJSON
設定の一つの例を次に示します。
{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" }
変換前:
64.242.88.10 - - [07/Mar/2004:16:10:02 -0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291
変換後:
{"host":"64.242.88.10","ident":null,"authuser":null,"datetime":"07/Mar/2004:16:10:02 -0800","request":"GET /mailman/listinfo/hsdivision HTTP/1.1","response":"200","bytes":"6291"}
例 : カスタムフィールドがある LOGTOJSON 設定
こちらは LOGTOJSON
設定の別の例です。
{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "customFieldNames": ["f1", "f2", "f3", "f4", "f5", "f6", "f7"] }
この設定では、前の例からの同じ Apache Common Log エントリは、次のように JSON 形式に変換されます。
{"f1":"64.242.88.10","f2":null,"f3":null,"f4":"07/Mar/2004:16:10:02 -0800","f5":"GET /mailman/listinfo/hsdivision HTTP/1.1","f6":"200","f7":"6291"}
例 : Apache Common Log エントリの変換
次のフロー設定は Apache Common Log エントリを JSON 形式の単一行レコードに変換します。
{ "flows": [ { "filePattern": "
/tmp/app.log*
", "kinesisStream": "my-stream
", "dataProcessingOptions": [ { "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" } ] } ] }
例 : 複数行レコードの変換
次のフロー設定は、最初の行が[SEQUENCE=
で開始している複数行レコードを解析します。各レコードはまず単一行レコードに変換されます。次に、値はタブの区切り記号に基づいたレコードから取得されます。取得された値は指定された customFieldNames
値にマッピングされ、JSON 形式の単一行レコードを形成します。
{ "flows": [ { "filePattern": "
/tmp/app.log*
", "kinesisStream": "my-stream
", "multiLineStartPattern": "\\[SEQUENCE=
", "dataProcessingOptions": [ { "optionName": "SINGLELINE" }, { "optionName": "CSVTOJSON", "customFieldNames": [ "field1
", "field2
", "field3
" ], "delimiter": "\\t
" } ] } ] }
例 : 一致パターンで LOGTOJSON 設定
こちらは、最後のフィールド (バイト) が省略された JSON 形式に変換された Apache Common Log エントリの LOGTOJSON
設定の一例です。
{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "matchPattern": "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3})", "customFieldNames": ["host", "ident", "authuser", "datetime", "request", "response"] }
変換前:
123.45.67.89 - - [27/Oct/2000:09:27:09 -0400] "GET /java/javaResources.html HTTP/1.0" 200
変換後:
{"host":"123.45.67.89","ident":null,"authuser":null,"datetime":"27/Oct/2000:09:27:09 -0400","request":"GET /java/javaResources.html HTTP/1.0","response":"200"}
エージェント CLI コマンドを使用する
システムスタートアップ時のエージェントの自動的開始:
sudo chkconfig aws-kinesis-agent on
エージェントのステータスの確認:
sudo service aws-kinesis-agent status
エージェントの停止:
sudo service aws-kinesis-agent stop
この場所からエージェントのログファイルを読む:
/var/log/aws-kinesis-agent/aws-kinesis-agent.log
エージェントのアンインストール:
sudo yum remove aws-kinesis-agent
よくある質問
Windows 用の Kinesis Agent はありますか?
Windows 用 Kinesis Agent は Linux プラットフォーム用 Kinesis Agent とは異なるソフトウェアです。
Kinesis Agent の速度が低下したり、RecordSendErrors
が増加したりするのはなぜですか?
通常、これは Kinesis のスロットリングが原因です。Kinesis Data Streams の WriteProvisionedThroughputExceeded
メトリクス、または Firehose 配信ストリームの ThrottledRecords
メトリクスをチェックします。これらのメトリクスが 0 を超えている場合は、ストリームの上限を引き上げる必要があることを意味します。詳細については、「Kinesis Data Stream limits」と「Amazon Firehose Delivery Streams」を参照してください。
スロットリングが原因ではないことがわかったら、Kinesis Agent が大量の小規模ファイルをテーリングするように設定されているかどうかを確認してください。Kinesis Agent が新しいファイルをテーリングするときには遅延が発生するため、少量の大きなファイルをテーリングするようにします。ログファイルを大きなファイルに統合してみてください。
java.lang.OutOfMemoryError
の例外が発生するのはなぜですか?
Kinesis Agent に、現在のワークロードを処理するための十分なメモリがないためです。/usr/bin/start-aws-kinesis-agent
で JAVA_START_HEAP
と JAVA_MAX_HEAP
を増やしてエージェントを再起動してみてください。
IllegalStateException : connection pool shut down
の例外が発生するのはなぜですか?
Kinesis エージェントに、現在のワークロードを処理するための十分な接続がないためです。/etc/aws-kinesis/agent.json
の一般的なエージェント設定で maxConnections
と maxSendingThreads
を増やしてみてください。これらのフィールドのデフォルト値は、使用可能なランタイムプロセッサの 12 倍です。高度なエージェント設定については、「AgentConfiguration.java
Kinesis Agent に関する別の問題をデバッグする方法を教えてください。
DEBUG
レベルログは /etc/aws-kinesis/log4j.xml
で有効にできます。
Kinesis Agent はどのように設定するとよいですか?
maxBufferSizeBytes
の値が小さいほど、Kinesis Agent がデータを送信する頻度が高くなります。そのため、レコードの配信時間が短縮されますが、Kinesis への 1 秒あたりのリクエスト数も増えます。
Kinesis Agent が重複レコードを送信するのはなぜですか?
これはファイルテーリングの設定ミスが原因です。各 fileFlow’s filePattern
が それぞれ 1 つのファイルのみと一致するようにします。また、使用されている logrotate
モードが copytruncate
モードになっている場合にも発生することがあります。重複を避けるため、モードをデフォルトか作成モードに変更してみてください。重複レコードの処理に関する詳細は、「Handling Duplicate Records」を参照してください。