Schreiben Sie mit Kinesis Agent in Amazon Kinesis Data Streams - Amazon Kinesis Data Streams

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Schreiben Sie mit Kinesis Agent in Amazon Kinesis Data Streams

Kinesis Agent ist eine eigenständige Java-Anwendung, mit der Sie Daten einfach erfassen und an Kinesis Data Streams senden können. Der Agent überwacht kontinuierlich eine Reihe von Dateien und sendet neue Daten an Ihren Stream. Der Agent übernimmt die Dateirotation, das Checkpointing und die Wiederholung bei Fehlern. Er sorgt für eine zuverlässige, zeitgerechte und einfache Bereitstellung Ihrer Daten. Außerdem werden CloudWatch Amazon-Metriken ausgegeben, damit Sie den Streaming-Prozess besser überwachen und Fehler beheben können.

Standardmäßig werden Datensätze aus den einzelnen Dateien anhand des Zeilenumbruchzeichens ('\n') analysiert. Der Agent kann jedoch auch für die Analyse mehrzeiliger Datensätze konfiguriert werden (siehe Geben Sie die Einstellungen für die Agentenkonfiguration an).

Sie können den Agenten in Linux-Serverumgebungen installieren, beispielsweise auf Webservern, Protokollservern und Datenbankservern. Nach der Installation des Agenten konfigurieren Sie diesen, indem Sie die zu überwachenden Dateien sowie den Stream für die Daten angeben. Nachdem der Agent konfiguriert wurde, sammelt er permanent Daten aus den Dateien und sendet sie zuverlässig an den Stream.

Erfüllen Sie die Voraussetzungen für Kinesis Agent

Laden Sie den Agenten herunter und installieren Sie ihn

Stellen Sie zunächst eine Verbindung mit Ihrer Instance her. Weitere Informationen finden Sie unter Connect to Your Instance im EC2 Amazon-Benutzerhandbuch. Wenn Sie Probleme mit der Verbindung haben, finden Sie weitere Informationen unter Problembehandlung beim Herstellen einer Verbindung zu Ihrer Instance im EC2 Amazon-Benutzerhandbuch.

So richten Sie den Agenten mithilfe der Amazon Linux AMI ein

Verwenden Sie den folgenden Befehl zum Herunterladen und Installieren des Agenten:

sudo yum install –y aws-kinesis-agent
So richten Sie den Agenten mit Red Hat Enterprise Linux ein

Verwenden Sie den folgenden Befehl zum Herunterladen und Installieren des Agenten:

sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn2.noarch.rpm
Um den Agenten einzurichten mit GitHub
  1. Laden Sie den Agenten von amazon-kinesis-agentawlabs/ herunter.

  2. Installieren Sie den Agenten, indem Sie zum Download-Verzeichnis navigieren und den folgenden Befehl ausführen:

    sudo ./setup --install
So richten Sie den Agenten in einem Docker-Container ein

Kinesis Agent kann auch in einem Container über die amazonlinux-Containerbasis ausgeführt werden. Verwenden Sie die folgende Docker-Datei und führen Sie dann docker build aus.

FROM amazonlinux RUN yum install -y aws-kinesis-agent which findutils COPY agent.json /etc/aws-kinesis/agent.json CMD ["start-aws-kinesis-agent"]

Konfigurieren und starten Sie den Agenten

So konfigurieren und starten Sie den Agenten
  1. Öffnen und bearbeiten Sie die Konfigurationsdatei (als Superuser, wenn Sie standardmäßige Dateizugriffsberechtigungen nutzen): /etc/aws-kinesis/agent.json

    Geben Sie in der Konfigurationsdatei die Dateien ("filePattern") an, aus denen der Agent Daten sammelt, sowie den Namen des Streams ("kinesisStream", an den der Agent die Daten sendet. Beachten Sie, dass der Dateiname ein Muster ist und der Agent Dateirotationen erkennt. Sie können nur einmal pro Sekunde Dateien rotieren oder neue Dateien erstellen. Der Agent verwendet den Zeitstempel der Dateierstellung, um die Dateien zu ermitteln, die gesammelt und zum Stream hinzugefügt werden. Wenn häufiger als einmal pro Sekunde Dateien rotiert oder neue Dateien erstellt werden, kann der Agent nicht mehr ordnungsgemäß zwischen den Dateien unterscheiden.

    { "flows": [ { "filePattern": "/tmp/app.log*", "kinesisStream": "yourkinesisstream" } ] }
  2. Starten Sie den Agenten manuell:

    sudo service aws-kinesis-agent start
  3. (Optional) Konfigurieren Sie den Agenten so, dass er beim Startup des Systems gestartet wird:

    sudo chkconfig aws-kinesis-agent on

Der Agent wird jetzt als Systemdienst im Hintergrund ausgeführt. Er überwacht kontinuierlich die angegebenen Dateien und sendet Daten an den angegebenen Stream. Die Agentenaktivität wird in /var/log/aws-kinesis-agent/aws-kinesis-agent.log protokolliert.

Geben Sie die Einstellungen für die Agentenkonfiguration an

Der Agent unterstützt die beiden obligatorischen Konfigurationseinstellungen filePattern und kinesisStream sowie optionale Konfigurationseinstellungen für zusätzliche Funktionen. Sie können sowohl die obligatorische als auch die optionale Konfiguration in /etc/aws-kinesis/agent.json angeben.

Wenn Sie die Konfigurationsdatei ändern, müssen Sie den Agenten mit den folgenden Befehlen anhalten und starten:

sudo service aws-kinesis-agent stop sudo service aws-kinesis-agent start

Alternativ können Sie auch den folgenden Befehl nutzen:

sudo service aws-kinesis-agent restart

Im Folgenden finden Sie die allgemeinen Konfigurationseinstellungen.

Konfigurationseinstellung Beschreibung
assumeRoleARN

Der ARN der Rolle, die vom Benutzer übernommen werden muss. Weitere Informationen finden Sie unter AWS Kontenübergreifendes Delegieren des Zugriffs mithilfe von IAM-Rollen im IAM-Benutzerhandbuch.

assumeRoleExternalId

Eine optionale Kennung, die festlegt, wer die Rolle übernehmen kann. Weitere Informationen finden Sie unter Verwendung einer externen ID im IAM-Benutzerhandbuch.

awsAccessKeyId

AWS Zugriffsschlüssel-ID, die die Standardanmeldedaten überschreibt. Diese Einstellung hat Vorrang vor allen anderen Anbietern von Anmeldeinformationen.

awsSecretAccessKey

AWS geheimer Schlüssel, der die Standardanmeldedaten überschreibt. Diese Einstellung hat Vorrang vor allen anderen Anbietern von Anmeldeinformationen.

cloudwatch.emitMetrics

Ermöglicht dem Agenten, Metriken auszusenden, CloudWatch sofern diese Einstellung gesetzt ist (true).

Standard: true

cloudwatch.endpoint

Der regionale Endpunkt für CloudWatch.

Standard: monitoring.us-east-1.amazonaws.com

kinesis.endpoint

Der regionale Endpunkt für Kinesis Data Streams.

Standard: kinesis.us-east-1.amazonaws.com

Im Folgenden finden Sie die Konfigurationseinstellungen für den Ablauf.

Konfigurationseinstellung Beschreibung
dataProcessingOptions

Die Liste der Verarbeitungsoptionen, die auf jeden analysierten Datensatz angewendet werden, ehe ein Datensatz an den Stream gesendet wird. Die Verarbeitungsoptionen werden in der angegebenen Reihenfolge ausgeführt. Weitere Informationen finden Sie unter Verwenden Sie den Agenten zur Vorverarbeitung von Daten.

kinesisStream

[Erforderlich] Der Name des Streams.

filePattern

[Erforderlich] Das Verzeichnis- und Dateimuster, das abgeglichen werden muss, damit es vom Agenten abgerufen werden kann. Für alle Dateien, die mit diesem Muster übereinstimmen, müssen Leseberechtigungen für aws-kinesis-agent-user vergeben werden. Für das die Dateien enthaltende Verzeichnis müssen Lese- und Ausführberechtigungen für aws-kinesis-agent-user vergeben werden.

initialPosition

Die Position, an der mit der Analyse der Datei begonnen wurde. Gültige Werte sind START_OF_FILE und END_OF_FILE.

Standard: END_OF_FILE

maxBufferAgeMillis

Die maximale Zeit (in Millisekunden), während der durch den Agenten Daten gepuffert werden, bevor sie an den Stream gesendet werden.

Wertebereich: 1.000 bis 900.000 (1 Sekunde bis 15 Minuten)

Standard: 60.000 (1 Minute)

maxBufferSizeBytes

Die maximale Größe (in Bytes), bis zu der durch den Agenten Daten gepuffert werden, bevor sie an den Stream gesendet werden.

Wertebereich: 1 bis 4.194.304 (4 MB)

Standard: 4.194.304 (4 MB)

maxBufferSizeRecords

Die maximale Anzahl der Datensätze, für die der Agent Daten puffert, ehe diese an den Stream gesendet werden.

Wertebereich: 1 bis 500

Standard: 500

minTimeBetweenFilePollsMillis

Das Zeitintervall (in Millisekunden), in dem der Agent die überwachten Dateien auf neue Daten abfragt und analysiert.

Wertbereich: 1 oder höher

Standard: 100

multiLineStartPattern

Das Muster für die Identifizierung des Datensatzbeginns. Ein Datensatz besteht aus einer Zeile, die mit dem angegebenen Muster übereinstimmt, und allen folgenden Zeilen, die nicht dem Muster entsprechen. Gültige Werte sind reguläre Ausdrücke. Standardmäßig wird jede neue Zeile in den Protokolldateien als einziger Datensatz analysiert.

partitionKeyOption

Die Methode zum Erstellen des Partitionsschlüssels. Gültige Werte sind RANDOM (zufällig generierte Ganzzahl) und DETERMINISTIC (ein aus den Daten berechneter Hash-Wert).

Standard: RANDOM

skipHeaderLines

Die Anzahl der Zeilen, die der Agent überspringt, ehe mit der Analyse der überwachten Dateien begonnen wird.

Wertbereich: 0 oder höher

Standard: 0 (null)

truncatedRecordTerminator

Die Zeichenfolge, mit der der Agent einen analysierten Datensatz kürzt, wenn die Datensatzgröße das zulässige Datensatz-Limit von Kinesis Data Streams überschreitet. (1,000 KB)

Standard: '\n' (Zeilenumbruch)

Überwachen Sie mehrere Dateiverzeichnisse und schreiben Sie in mehrere Streams

Wenn Sie mehrere Ablaufkonfigurationseinstellungen angeben, können Sie den Agenten so konfigurieren, dass er mehrere Dateiverzeichnisse überwacht und Daten an verschiedene Streams sendet. Im folgenden Konfigurationsbeispiel überwacht der Agent zwei Dateiverzeichnisse und sendet Daten an einen Kinesis-Stream bzw. einen Firehose-Lieferstream. Beachten Sie, dass Sie unterschiedliche Endpunkte für Kinesis Data Streams und Firehose angeben können, sodass sich Ihr Kinesis-Stream und Ihr Firehose-Lieferstream nicht in derselben Region befinden müssen.

{ "cloudwatch.emitMetrics": true, "kinesis.endpoint": "https://your/kinesis/endpoint", "firehose.endpoint": "https://your/firehose/endpoint", "flows": [ { "filePattern": "/tmp/app1.log*", "kinesisStream": "yourkinesisstream" }, { "filePattern": "/tmp/app2.log*", "deliveryStream": "yourfirehosedeliverystream" } ] }

Ausführlichere Informationen zur Verwendung des Agenten mit Firehose finden Sie unter Writing to Amazon Data Firehose with Kinesis Agent.

Verwenden Sie den Agenten zur Vorverarbeitung von Daten

Der Agent kann die Datensätze vorverarbeiten, die aus den überwachten Dateien analysiert wurden, ehe diese an Ihren Stream gesendet werden. Sie können dieses Feature aktivieren, indem Sie Ihrem Dateifluss die Konfigurationseinstellung dataProcessingOptions hinzufügen. Sie können eine oder mehrere Verarbeitungsoptionen hinzufügen. Diese werden in der angegebenen Reihenfolge ausgeführt.

Der Agent unterstützt die folgenden aufgelisteten Verarbeitungsoptionen. Der Agent ist ein Open-Source-Tool, sodass Sie dessen Verarbeitungsoptionen optimieren und erweitern können. Sie können den Agenten von Kinesis Agent herunterladen.

Verarbeitungsoptionen
SINGLELINE

Konvertiert einen mehrzeiligen Datensatz in einen einzeiligen Datensatz, indem Zeilenumbruchzeichen sowie vorangestellte und folgende Leerzeichen entfernt werden.

{ "optionName": "SINGLELINE" }
CSVTOJSON

Konvertiert einen Datensatz aus dem durch Trennzeichen getrennten Format in einen Datensatz im JSON-Format.

{ "optionName": "CSVTOJSON", "customFieldNames": [ "field1", "field2", ... ], "delimiter": "yourdelimiter" }
customFieldNames

[Erforderlich] Die Feldnamen, die als Schlüssel in den einzelnen JSON-Schlüssel-Wert-Paaren verwendet werden. Wenn Sie beispielsweise ["f1", "f2"] angeben, wird der Datensatz „v1, v2“ in {"f1":"v1","f2":"v2"} konvertiert.

delimiter

Die Zeichenfolge, die als Trennzeichen im Datensatz verwendet wird. Standardmäßig wird ein Komma (,) verwendet.

LOGTOJSON

Konvertiert einen Datensatz aus einem Protokollformat in einen Datensatz im JSON-Format. Folgende Protokollformate werden unterstützt: Apache Common Log, Apache Combined Log, Apache Error Log und RFC3164 Syslog.

{ "optionName": "LOGTOJSON", "logFormat": "logformat", "matchPattern": "yourregexpattern", "customFieldNames": [ "field1", "field2", ] }
logFormat

[Erforderlich] Das Format des Protokolleintrags. Folgende Werte sind möglich:

  • COMMONAPACHELOG – Das Apache-Common-Log-Format. Jeder Protokolleintrag weist standardmäßig das folgende Muster auf: „%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes}“.

  • COMBINEDAPACHELOG – Das Apache-Combined-Log-Format. Jeder Protokolleintrag weist standardmäßig das folgende Muster auf: „%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes} %{referrer} %{agent}“.

  • APACHEERRORLOG – Das Apache-Error-Log-Format. Jeder Protokolleintrag weist standardmäßig das folgende Muster auf: „[%{timestamp}] [%{module}:%{severity}] [pid %{processid}:tid %{threadid}] [client: %{client}] %{message}“.

  • SYSLOG— Das RFC3164 Syslog-Format. Jeder Protokolleintrag weist standardmäßig das folgende Muster auf: „%{timestamp} %{hostname} %{program}[%{processid}]: %{message}“.

matchPattern

Das reguläre Ausdrucksmuster, mit dem Werte aus den Protokolleinträgen extrahiert werden. Diese Einstellung wird verwendet, wenn Ihr Protokolleintrag nicht eines der vordefinierten Protokollformate aufweist. Bei dieser Einstellung müssen Sie auch customFieldNames angeben.

customFieldNames

Die benutzerdefinierten Feldnamen, die als Schlüssel in den einzelnen JSON-Schlüssel-Wert-Paaren verwendet werden. Mit dieser Einstellung können Sie Feldnamen für Werte definieren, die aus matchPattern extrahiert wurden, oder die Standardfeldnamen von vordefinierten Protokollformaten überschreiben.

Beispiel : LOGTOJSON-Konfiguration

Nachfolgend ein Beispiel einer LOGTOJSON-Konfiguration für einen Apache Common Log-Eintrag, der in ein JSON-Format konvertiert wurde:

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" }

Vor der Konvertierung:

64.242.88.10 - - [07/Mar/2004:16:10:02 -0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291

Nach der Konvertierung:

{"host":"64.242.88.10","ident":null,"authuser":null,"datetime":"07/Mar/2004:16:10:02 -0800","request":"GET /mailman/listinfo/hsdivision HTTP/1.1","response":"200","bytes":"6291"}
Beispiel : LOGTOJSON-Konfiguration mit benutzerdefinierten Feldern

Im Folgenden ein weiteres Beispiel einer LOGTOJSON-Konfiguration:

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "customFieldNames": ["f1", "f2", "f3", "f4", "f5", "f6", "f7"] }

Durch diese Konfigurationseinstellung wird der Apache Common Log-Eintrag aus dem vorherigen Beispiel wie folgt in ein JSON-Format konvertiert:

{"f1":"64.242.88.10","f2":null,"f3":null,"f4":"07/Mar/2004:16:10:02 -0800","f5":"GET /mailman/listinfo/hsdivision HTTP/1.1","f6":"200","f7":"6291"}
Beispiel : Konvertieren eines Apache Common Log-Eintrags

Bei der folgenden Ablaufkonfiguration wird ein Apache Common Log-Eintrag in einen einzeiligen Datensatz im JSON-Format umgewandelt:

{ "flows": [ { "filePattern": "/tmp/app.log*", "kinesisStream": "my-stream", "dataProcessingOptions": [ { "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" } ] } ] }
Beispiel : Konvertieren mehrzeiliger Datensätze

Bei der folgenden Ablaufkonfiguration werden mehrzeilige Datensätze analysiert, deren erste Zeile mit „[SEQUENCE=“ beginnt. Jeder Datensatz wird in einen einzeiligen Datensatz konvertiert. Anschließend werden Werte aus dem Datensatz basierend auf einem Tabulatortrennzeichen extrahiert. Die extrahierten Werte werden zu angegebenen customFieldNames-Werten zugeordnet und ergeben so einen einzeiligen Datensatz im JSON-Format.

{ "flows": [ { "filePattern": "/tmp/app.log*", "kinesisStream": "my-stream", "multiLineStartPattern": "\\[SEQUENCE=", "dataProcessingOptions": [ { "optionName": "SINGLELINE" }, { "optionName": "CSVTOJSON", "customFieldNames": [ "field1", "field2", "field3" ], "delimiter": "\\t" } ] } ] }
Beispiel : LOGTOJSON-Konfiguration mit Übereinstimmungsmuster

Nachfolgend ein Beispiel einer LOGTOJSON-Konfiguration für einen Apache Common Log-Eintrag, der in das JSON-Format konvertiert wurde. Das letzte Feld (Bytes) wurde ausgelassen:

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "matchPattern": "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3})", "customFieldNames": ["host", "ident", "authuser", "datetime", "request", "response"] }

Vor der Konvertierung:

123.45.67.89 - - [27/Oct/2000:09:27:09 -0400] "GET /java/javaResources.html HTTP/1.0" 200

Nach der Konvertierung:

{"host":"123.45.67.89","ident":null,"authuser":null,"datetime":"27/Oct/2000:09:27:09 -0400","request":"GET /java/javaResources.html HTTP/1.0","response":"200"}

Verwenden Sie CLI-Befehle für Agenten

Automatisches Startup des Agenten beim Systemstart:

sudo chkconfig aws-kinesis-agent on

Prüfen des Status des Agenten:

sudo service aws-kinesis-agent status

Beenden des Agenten:

sudo service aws-kinesis-agent stop

Auslesen der Protokolldatei des Agenten von diesem Speicherort:

/var/log/aws-kinesis-agent/aws-kinesis-agent.log

Deinstallieren des Agenten:

sudo yum remove aws-kinesis-agent

Häufig gestellte Fragen

Gibt es einen Kinesis Agent für Windows?

Kinesis Agent für Windows ist eine andere Software als Kinesis Agent für Linux-Plattformen.

Warum verlangsamt sich Kinesis Agent und/oder RecordSendErrors nimmt zu?

Dies ist normalerweise auf die Drosselung durch Kinesis zurückzuführen. Überprüfen Sie die WriteProvisionedThroughputExceeded Metrik für Kinesis Data Streams oder die ThrottledRecords Metrik für Firehose Delivery Streams. Jede Erhöhung dieser Metriken von 0 zeigt an, dass die Stream-Grenzwerte erhöht werden müssen. Weitere Informationen finden Sie unter Kinesis-Datenstrom-Grenzwerte und Amazon Firehose Delivery Streams.

Sobald Sie die Drosselung ausgeschlossen haben, überprüfen Sie, ob der Kinesis Agent so konfiguriert ist, dass er eine große Menge kleiner Dateien durchsucht. Es gibt eine Verzögerung, wenn der Kinesis Agent eine neue Datei überwacht, daher sollte der Kinesis-Agent eine kleine Menge größerer Dateien überwachen. Versuchen Sie, Ihre Protokolldateien in größeren Dateien zusammenzufassen.

Warum erhalte ich java.lang.OutOfMemoryError -Ausnahmen?

Kinesis Agent verfügt nicht über genügend Arbeitsspeicher, um seinen aktuellen Workload zu bewältigen. Versuchen Sie, JAVA_START_HEAP und JAVA_MAX_HEAP in /usr/bin/start-aws-kinesis-agent zu erhöhen und den Agenten neu zu starten.

Warum erhalte ich IllegalStateException : connection pool shut down-Ausnahmen?

Kinesis Agent verfügt nicht über genügend Verbindungen, um seinen aktuellen Workload zu bewältigen. Versuchen Sie, maxConnections und maxSendingThreads in den allgemeinen Konfigurationseinstellungen des Agenten unter /etc/aws-kinesis/agent.json zu erhöhen. Der Standardwert für diese Felder ist das 12-fache der verfügbaren Laufzeitprozessoren. Weitere Informationen zu den Einstellungen für erweiterte Agentenkonfigurationen finden Sie unter AgentConfiguration.java.

Wie kann ich ein anderes Problem mit Kinesis Agent beheben?

DEBUG-Level-Protokolle können in /etc/aws-kinesis/log4j.xml aktiviert werden.

Wie sollte ich Kinesis Agent konfigurieren?

Je kleiner das maxBufferSizeBytes, desto häufiger sendet der Kinesis Agent Daten. Dies kann nützlich sein, da es die Lieferzeit von Datensätzen verkürzt, aber es erhöht auch die Anfragen pro Sekunde an Kinesis.

Warum sendet Kinesis Agent doppelte Datensätze?

Dies ist auf eine Fehlkonfiguration bei der Dateiüberwachung zurückzuführen. Stellen Sie sicher, dass jedes fileFlow’s filePattern nur einer Datei entspricht. Dies kann auch auftreten, wenn der verwendete logrotate-Modus im copytruncate-Modus ist. Versuchen Sie, den Modus auf den Standard- oder Erstellungsmodus zu ändern, um Duplikate zu vermeiden. Weitere Informationen zum Umgang mit doppelten Datensätzen finden Sie unter Umgang mit doppelten Datensätzen.