Écriture dans Amazon Kinesis Data Streams à l'aide de l'agent Kinesis - Amazon Kinesis Data Streams

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Écriture dans Amazon Kinesis Data Streams à l'aide de l'agent Kinesis

L'agent Kinesis est une application logicielle Java autonome qui permet de collecter et d'envoyer facilement des données à Kinesis Data Streams. L'agent surveille en permanence un ensemble de fichiers et envoie de nouvelles données à votre flux. L'agent gère les rotations de fichier, les points de contrôle et les nouvelles tentatives après échec. Il diffuse toutes vos données de manière fiable, rapide et simple. Il émet également des CloudWatch métriques Amazon pour vous aider à mieux surveiller et résoudre les problèmes liés au processus de streaming.

Par défaut, les enregistrements sont analysés à partir de chaque fichier sur la base du caractère de saut de ligne ('\n'). Toutefois, l'agent peut également être configuré pour analyser les enregistrements de plusieurs lignes (consultez Paramètres de configuration de l'agent).

Cet agent peut être installé dans des environnements basés sur des serveurs Linux tels que des serveurs Web, serveurs de journaux ou encore serveurs de base de données. Après avoir installé l'agent, configurez-le en spécifiant les fichiers à surveiller et le flux pour les données. Une fois que l'agent est configuré, il collecte de façon durable les données depuis les fichiers et les envoie en toute fiabilité dans le flux.

Prérequis

Téléchargement et installation de l'agent

Commencez par vous connecter à votre instance. Pour plus d'informations, consultez Connexion à votre instance dans le Guide de l'utilisateur Amazon EC2 pour les instances Linux. Si vous rencontrez des difficultés de connexion, consultez la rubrique Résolution des problèmes de connexion à votre instance dans le Guide de l'utilisateur Amazon EC2 pour les instances Linux.

Pour configurer l'agent à l'aide d'Amazon Linux AMI

Utilisez la commande suivante pour télécharger et installer l'agent :

sudo yum install –y aws-kinesis-agent
Pour configurer l'agent à l'aide de Red Hat Enterprise Linux

Utilisez la commande suivante pour télécharger et installer l'agent :

sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn2.noarch.rpm
Pour configurer l'agent à l'aide de GitHub
  1. Téléchargez l'agent depuis awlabs/ amazon-kinesis-agent.

  2. Installez l'agent en accédant au répertoire de téléchargement et en exécutant la commande suivante :

    sudo ./setup --install
Configuration de l'agent dans un conteneur Docker

L'agent Kinesis peut également être exécuté dans un conteneur via la base de conteneurs amazonlinux. Utilisez le Dockerfile suivant, puis exécutez docker build.

FROM amazonlinux RUN yum install -y aws-kinesis-agent which findutils COPY agent.json /etc/aws-kinesis/agent.json CMD ["start-aws-kinesis-agent"]

Configuration et démarrage de l'agent

Pour configurer et démarrer l'agent
  1. Ouvrez le fichier de configuration et modifiez-le (en tant que super-utilisateur si vous utilisez les autorisations d'accès fichier par défaut) :/etc/aws-kinesis/agent.json

    Dans ce fichier de configuration, spécifiez les fichiers ( "filePattern" ) à partir desquels l'agent collecte les données, et le nom du flux ( "kinesisStream" ) dans lequel l'agent envoie les données. Notez que le nom de fichier est un modèle et que l'agent reconnaît les rotations de fichier. Vous pouvez effectuer une rotation de fichier ou créer de nouveaux fichiers pas plus d'une fois par seconde. L'agent utilise l'horodatage de création de fichier pour déterminer quels fichiers doivent être suivis et tracés dans votre flux. Créer de nouveaux fichiers ou effectuer une rotation de fichier plus souvent qu'une fois par seconde ne permet pas à l'agent de différencier correctement les fichiers.

    { "flows": [ { "filePattern": "/tmp/app.log*", "kinesisStream": "yourkinesisstream" } ] }
  2. Démarrez l'agent manuellement :

    sudo service aws-kinesis-agent start
  3. (Facultatif) Configurez l'agent pour qu'il soit lancé au démarrage du système :

    sudo chkconfig aws-kinesis-agent on

L'agent fonctionne maintenant en arrière-plan en tant que service système. Il surveille en permanence les fichiers spécifiés et envoie des données dans le flux spécifié. L'activité de l'agent est enregistrée dans /var/log/aws-kinesis-agent/aws-kinesis-agent.log.

Paramètres de configuration de l'agent

L'agent prend en charge les deux paramètres de configuration obligatoires filePattern et kinesisStream, plus les paramètres de configuration facultatifs des fonctionnalités supplémentaires. Vous pouvez spécifier aussi bien une configuration obligatoire que facultative dans /etc/aws-kinesis/agent.json.

Chaque fois que vous modifiez le fichier de configuration, vous devez arrêter et démarrer l'agent en utilisant les commandes suivantes :

sudo service aws-kinesis-agent stop sudo service aws-kinesis-agent start

Vous pouvez également utiliser la commande suivante :

sudo service aws-kinesis-agent restart

Les paramètres de configuration générale sont indiqués ci-après.

Paramètre de configuration Description
assumeRoleARN

ARN du rôle à être assumé par l'utilisateur. Pour plus d'informations, consultez la section Accès délégué entre AWS comptes à l'aide de rôles IAM dans le guide de l'utilisateur IAM.

assumeRoleExternalId

Identifiant facultatif qui détermine qui peut assumer le rôle. Pour en savoir plus, consultez la rubrique Procédure d'utilisation d'un ID externe dans le Guide de l'utilisateur IAM.

awsAccessKeyId

AWS ID de clé d'accès qui remplace les informations d'identification par défaut. Ce paramètre est prioritaire sur tous les autres fournisseurs d'informations d'identification.

awsSecretAccessKey

AWS clé secrète qui remplace les informations d'identification par défaut. Ce paramètre est prioritaire sur tous les autres fournisseurs d'informations d'identification.

cloudwatch.emitMetrics

Permet à l'agent d'émettre des métriques CloudWatch s'il est défini (true).

Valeur par défaut : true

cloudwatch.endpoint

Le point de terminaison régional pour CloudWatch.

Par défaut : monitoring.us-east-1.amazonaws.com

kinesis.endpoint

Point de terminaison régional pour Kinesis Data Streams.

Par défaut : kinesis.us-east-1.amazonaws.com

Les paramètres de configuration de flux sont indiqués ci-après.

Paramètre de configuration Description
dataProcessingOptions

Liste des options de traitement appliquées à chaque enregistrement analysé avant qu'il ne soit envoyé au flux. Les options de traitement sont exécutées dans l'ordre spécifié. Pour de plus amples informations, veuillez consulter Utilisation de l'agent pour prétraiter les données.

kinesisStream

[Obligatoire] Nom du flux.

filePattern

[Obligatoire] Le répertoire et le modèle de fichier qui doivent correspondre pour être récupérés par l'agent. Pour tous les fichiers correspondant à ce modèle, l'autorisation en lecture doit être accordée à aws-kinesis-agent-user. Pour le répertoire contenant les fichiers, les autorisations en lecture et exécution doivent être accordées à aws-kinesis-agent-user.

initialPosition

Position initiale à partir de laquelle le fichier a commencé à être analysé. Les valeurs valides sont START_OF_FILE et END_OF_FILE.

Par défaut : END_OF_FILE

maxBufferAgeMillis

Durée maximale, en millisecondes, pendant laquelle l'agent met les données en tampon avant de les envoyer dans le flux.

Plage de valeurs : 1 000 à 900 000 (1 seconde à 15 minutes)

Par défaut : 60 000 (1 minute)

maxBufferSizeBytes

Taille maximale, en octets, pour laquelle l'agent met les données en tampon avant de les envoyer dans le flux.

Plage de valeurs : 1 à 4 194 304 (4 Mo)

Par défaut : 4 194 304 (4 Mo)

maxBufferSizeRecords

Nombre maximal d'enregistrements pour lequel l'agent met les données en tampon avant de les envoyer dans le flux.

Plage de valeurs : 1 à 500

Par défaut : 500

minTimeBetweenFilePollsMillis

Fréquence, en millisecondes, à laquelle l'agent interroge et analyse les fichiers surveillés pour rechercher les nouvelles données.

Plage de valeurs : 1 ou plus

Par défaut : 100

multiLineStartPattern

Modèle d'identification du début d'un enregistrement. Un enregistrement se compose d'une ligne qui correspond au modèle et de lignes suivantes qui ne correspondent pas au modèle. Les valeurs valides sont les expressions régulières. Par défaut, chaque nouvelle ligne comprise dans les fichiers journaux est analysée comme étant un enregistrement.

partitionKeyOption

Méthode de génération de la clé de partition. Les valeurs valides sont RANDOM (entier généré uniquement aléatoirement) et DETERMINISTIC (une valeur de hachage calculée à partir des données).

Par défaut : RANDOM

skipHeaderLines

Nombre de lignes que l'agent doit ignorer lors de l'analyse au début des fichiers surveillés.

Plage de valeurs : 0 ou plus

Par défaut : 0 (zéro)

truncatedRecordTerminator

Chaîne que l'agent utilise pour tronquer un enregistrement analysé lorsque la taille de ce dernier dépasse la taille limite d'un enregistrement Kinesis Data Streams. (1,000 Ko)

Par défaut : '\n' (nouvelle ligne)

Surveillance de plusieurs répertoires de fichiers et écriture dans plusieurs flux

En spécifiant plusieurs paramètres de configuration de flux, vous pouvez configurer l'agent pour surveiller plusieurs répertoires de fichiers et envoyer des données dans plusieurs flux. Dans l'exemple de configuration suivant, l'agent surveille deux répertoires de fichiers et envoie des données respectivement à un flux Kinesis et à un flux de diffusion Firehose. Notez que vous pouvez spécifier différents points de terminaison pour Kinesis Data Streams et Firehose afin que votre flux Kinesis et votre flux de diffusion Firehose ne soient pas nécessairement situés dans la même région.

{ "cloudwatch.emitMetrics": true, "kinesis.endpoint": "https://your/kinesis/endpoint", "firehose.endpoint": "https://your/firehose/endpoint", "flows": [ { "filePattern": "/tmp/app1.log*", "kinesisStream": "yourkinesisstream" }, { "filePattern": "/tmp/app2.log*", "deliveryStream": "yourfirehosedeliverystream" } ] }

Pour des informations plus détaillées sur l'utilisation de l'agent avec Firehose, consultez Writing to Amazon Data Firehose with Kinesis Agent.

Utilisation de l'agent pour prétraiter les données

L'agent peut prétraiter les enregistrements analysés à partir des fichiers surveillés avant de les envoyer dans votre flux. Vous pouvez activer cette fonctionnalité en ajoutant le paramètre de configuration dataProcessingOptions à votre flux de fichiers. Une ou plusieurs options de traitement peuvent être ajoutées. Elles sont exécutées dans l'ordre spécifié.

L'agent prend en charge les options de traitement répertoriées ci-après. Etant donné que l'agent est open source, vous pouvez continuer à développer et étendre ses options de traitement. Vous pouvez télécharger l'agent à partit de Kinesis Agent.

Options de traitement
SINGLELINE

Convertit un enregistrement de plusieurs lignes en un enregistrement d'une seule ligne en supprimant les caractères de saut de ligne, les espaces de début et les espaces de fin.

{ "optionName": "SINGLELINE" }
CSVTOJSON

Convertit un enregistrement du format séparé par délimiteur au format JSON.

{ "optionName": "CSVTOJSON", "customFieldNames": [ "field1", "field2", ... ], "delimiter": "yourdelimiter" }
customFieldNames

[Obligatoire] Noms de domaine utilisés comme clés dans chaque paire clé-valeur JSON. Par exemple, si vous spécifiez ["f1", "f2"], l'enregistrement « v1, v2 » est converti en {"f1":"v1","f2":"v2"}.

delimiter

Chaîne utilisée comme délimiteur dans l'enregistrement. La valeur par défaut est une virgule (,).

LOGTOJSON

Convertit un enregistrement du format de journal au format JSON. Les formats de journal pris en charge sont Apache Common Log, Apache Combined Log, Apache Error Log et RFC3164 Syslog.

{ "optionName": "LOGTOJSON", "logFormat": "logformat", "matchPattern": "yourregexpattern", "customFieldNames": [ "field1", "field2", ] }
logFormat

[Obligatoire] Format d'entrée de journal. Les valeurs admises sont les suivantes :

  • COMMONAPACHELOG : le format de journal courant d'Apache. Chaque entrée de journal a le schéma suivant par défaut : « %{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes} ».

  • COMBINEDAPACHELOG : le format de journal combiné d'Apache. Chaque entrée de journal a le schéma suivant par défaut : « %{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes} %{referrer} %{agent} ».

  • APACHEERRORLOG : le format de journal d'erreurs d'Apache. Chaque entrée de journal a le schéma suivant par défaut : « [%{timestamp}] [%{module}:%{severity}] [pid %{processid}:tid %{threadid}] [client: %{client}] %{message} ».

  • SYSLOG : le format Syslog RFC3164. Chaque entrée de journal a le schéma suivant par défaut : « %{timestamp} %{hostname} %{program}[%{processid}]: %{message} ».

matchPattern

Modèle d'expression régulière utilisé pour extraire les valeurs des entrées de journal. Ce paramètre est utilisé si votre entrée de journal n'a pas l'un des formats de journalisation prédéfinis. Si ce paramètre est utilisé, vous devez également spécifier customFieldNames.

customFieldNames

Noms de champ personnalisés utilisés comme clés dans chaque paire clé-valeur JSON. Vous pouvez utiliser ce paramètre pour définir les noms de champ pour les valeurs extraites de matchPattern, ou remplacer les noms de champ par défaut des formats de journalisation prédéfinis.

Exemple  : Configuration LOGTOJSON

Voici un exemple de configuration LOGTOJSON pour une entrée au format Journal courant Apache convertie au format JSON :

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" }

Avant la conversion :

64.242.88.10 - - [07/Mar/2004:16:10:02 -0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291

Après la conversion :

{"host":"64.242.88.10","ident":null,"authuser":null,"datetime":"07/Mar/2004:16:10:02 -0800","request":"GET /mailman/listinfo/hsdivision HTTP/1.1","response":"200","bytes":"6291"}
Exemple  : Configuration LOGTOJSON avec champs personnalisés

Voici un autre exemple de configuration LOGTOJSON :

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "customFieldNames": ["f1", "f2", "f3", "f4", "f5", "f6", "f7"] }

Avec ce paramètre de configuration, la même entrée au format Journal courant Apache de journal issue de l'exemple précédent est convertie au format JSON comme suit :

{"f1":"64.242.88.10","f2":null,"f3":null,"f4":"07/Mar/2004:16:10:02 -0800","f5":"GET /mailman/listinfo/hsdivision HTTP/1.1","f6":"200","f7":"6291"}
Exemple  : Conversion d'une entrée au format Journal courant Apache

La configuration de flux suivante convertit une entrée au format Journal courant Apache en un enregistrement d'une seule ligne au format JSON :

{ "flows": [ { "filePattern": "/tmp/app.log*", "kinesisStream": "my-stream", "dataProcessingOptions": [ { "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" } ] } ] }
Exemple  : Conversion des enregistrements de plusieurs lignes

La configuration de flux suivante analyse les enregistrements de plusieurs lignes dont la première ligne commence par « [SEQUENCE= ». Chaque enregistrement est d'abord converti en un enregistrement d'une seule ligne. Les valeurs sont ensuite extraites de l'enregistrement sur la base d'un séparateur tabulation. Les valeurs extraites sont mappées à des valeurs customFieldNames spécifiées pour former un enregistrement d'une seule ligne au format JSON.

{ "flows": [ { "filePattern": "/tmp/app.log*", "kinesisStream": "my-stream", "multiLineStartPattern": "\\[SEQUENCE=", "dataProcessingOptions": [ { "optionName": "SINGLELINE" }, { "optionName": "CSVTOJSON", "customFieldNames": [ "field1", "field2", "field3" ], "delimiter": "\\t" } ] } ] }
Exemple  : Configuration LOGTOJSON avec modèle de correspondance

Voici un exemple de configuration LOGTOJSON pour une entrée au format Journal courant Apache convertie au format JSON, avec le dernier champ (octets) omis :

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "matchPattern": "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3})", "customFieldNames": ["host", "ident", "authuser", "datetime", "request", "response"] }

Avant la conversion :

123.45.67.89 - - [27/Oct/2000:09:27:09 -0400] "GET /java/javaResources.html HTTP/1.0" 200

Après la conversion :

{"host":"123.45.67.89","ident":null,"authuser":null,"datetime":"27/Oct/2000:09:27:09 -0400","request":"GET /java/javaResources.html HTTP/1.0","response":"200"}

Interface de ligne de commande de l'agent

Lancer automatiquement l'agent au démarrage du système :

sudo chkconfig aws-kinesis-agent on

Vérifier le statut de l'agent :

sudo service aws-kinesis-agent status

Arrêter l'agent :

sudo service aws-kinesis-agent stop

Lire le fichier journal de l'agent à partir de cet emplacement :

/var/log/aws-kinesis-agent/aws-kinesis-agent.log

Désinstaller l'agent :

sudo yum remove aws-kinesis-agent

FAQ

Existe-t-il un Kinesis Agent pour Windows ?

Kinesis Agent pour Windows est un logiciel différent de Kinesis Agent pour les plateformes Linux.

Pourquoi Kinesis Agent ralentit-il ou RecordSendErrors augmente-t-il ?

Cela est généralement dû à une limitation de Kinesis. Vérifiez la WriteProvisionedThroughputExceeded métrique pour Kinesis Data Streams ou ThrottledRecords la métrique pour Firehose Delivery Streams. Toute augmentation de ces métriques par rapport à 0 indique que les limites de flux doivent être augmentées. Pour plus d'informations, consultez les rubriques Limites des flux de données Kinesis (français non garanti) et Flux de diffusion Amazon Firehose (français non garanti).

Une fois que vous avez exclu la limitation, vérifiez si Kinesis Agent est configuré pour suivre un grand nombre de petits fichiers. Il y a un délai lorsque Kinesis Agent suit un nouveau fichier, c'est pourquoi Kinesis Agent doit suivre un petit nombre de fichiers plus volumineux. Essayez de regrouper vos fichiers journaux dans des fichiers plus volumineux.

Pourquoi est-ce que je reçois des exceptions java.lang.OutOfMemoryError  ?

Kinesis Agent ne dispose pas de suffisamment de mémoire pour gérer sa charge de travail actuelle. Essayez d'augmenter JAVA_START_HEAP et JAVA_MAX_HEAP dans /usr/bin/start-aws-kinesis-agent et de redémarrer l'agent.

Pourquoi est-ce que je reçois des exceptions IllegalStateException : connection pool shut down ?

Kinesis Agent ne dispose pas de suffisamment de connexions pour gérer sa charge de travail actuelle. Essayez d'augmenter maxConnections et maxSendingThreads dans les paramètres de configuration de votre agent général à /etc/aws-kinesis/agent.json. La valeur par défaut de ces champs est 12 fois supérieure au nombre de processeurs d'exécution disponibles. Consultez AgentConfiguration.java pour en savoir plus sur les paramètres de configuration avancés des agents.

Comment puis-je déboguer un autre problème avec Kinesis Agent ?

Les journaux de niveau DEBUG peuvent être activés dans /etc/aws-kinesis/log4j.xml.

Comment dois-je configurer Kinesis Agent ?

Plus la taille de maxBufferSizeBytes est petite, plus Kinesis Agent envoie fréquemment des données. Cela peut être une bonne chose, car cela réduit le délai de livraison des enregistrements, mais cela augmente également le nombre de requêtes par seconde adressées à Kinesis.

Pourquoi Kinesis Agent envoie-t-il des enregistrements dupliqués ?

Cela se produit en raison d'une mauvaise configuration dans le suivi des fichiers. Assurez-vous que chaque fileFlow’s filePattern ne correspond qu'à un seul fichier. Cela peut également se produire si le mode logrotate utilisé est en mode copytruncate. Essayez de remplacer le mode par défaut ou de créer le mode pour éviter les doublons. Pour plus d'informations sur la gestion des enregistrements dupliqués, consultez la rubrique Gestion des enregistrements dupliqués (français non garanti).