Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Escrivir en Amazon Kinesis Data Streams mediante el agente de Kinesis
El agente de Kinesis es una aplicación de software Java independiente que ofrece una forma sencilla de recopilar y enviar datos a Kinesis Data Streams. El agente monitoriza constantemente un conjunto de archivos y envía nuevos datos a su secuencia. El agente se encarga de rotar los archivos, crear puntos de restauración y realizar reintentos cuando se producen errores. El agente entrega todos los datos de manera confiable, puntual y sencilla. También emite CloudWatch métricas de Amazon para ayudarte a supervisar y solucionar mejor los problemas del proceso de streaming.
De forma predeterminada, los registros de cada archivo se analizan en función del carácter de nueva línea ('\n'
). Sin embargo, el agente también se puede configurar para analizar registros multilínea (consulte Especificar las opciones de configuración del agente).
Puede instalar el agente en entornos de servidor basados en Linux, como servidores web, de registro o de base de datos. Después de instalar el agente, configúrelo especificando los archivos que desee monitorizar y la secuencia de los datos. Una vez configurado, el agente recopila datos de los archivos de forma duradera y los envía de forma confiable a la secuencia.
Temas
Cumplir con los requisitos previos para el agente de Kinesis
-
Su sistema operativo debe ser la versión 2015.09 o posterior de Amazon Linux AMI o la versión 7 o posterior de Red Hat Enterprise Linux.
-
Si utilizas Amazon EC2 para ejecutar tu agente, lanza tu EC2 instancia.
-
Administre sus AWS credenciales mediante uno de los siguientes métodos:
-
Especifique un rol de IAM al lanzar la EC2 instancia.
-
Especifique AWS las credenciales al configurar el agente (consulte awsAccessKeyID y awsSecretAccessclave).
-
/etc/sysconfig/aws-kinesis-agent
Edítelo para especificar su región y sus claves de AWS acceso. -
Si la EC2 instancia está en una AWS cuenta diferente, cree una función de IAM para proporcionar acceso al servicio Kinesis Data Streams y especifique esa función al configurar el agente (consulte AssumeroLearn and Id). assumeRoleExternal Utilice uno de los métodos anteriores para especificar las AWS credenciales de un usuario de la otra cuenta que tenga permiso para asumir este rol.
-
-
El rol o AWS las credenciales de IAM que especifique deben tener permiso para realizar la operación de Kinesis Data PutRecordsStreams para que el agente envíe datos a su transmisión. Si habilita la CloudWatch supervisión del agente, también necesitará permiso para realizar la CloudWatch PutMetricDataoperación. Para obtener más información, consulte Control del acceso a los recursos de Amazon Kinesis Data Streams mediante IAMSupervise el estado de los agentes de Kinesis Data Streams con Amazon CloudWatch, y Control de CloudWatch acceso.
Descargar e instalar el agente
Primero, conéctese a la instancia. Para obtener más información, consulte Connect to Your Instance en la Guía del EC2 usuario de Amazon. Si tiene problemas para conectarse, consulte Solución de problemas de conexión a su instancia en la Guía del EC2 usuario de Amazon.
Para configurar el agente a través de la AMI de Amazon Linux
Use el siguiente comando para descargar e instalar el agente:
sudo yum install –y aws-kinesis-agent
Para configurar el agente en Red Hat Enterprise Linux
Use el siguiente comando para descargar e instalar el agente:
sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn2.noarch.rpm
Para configurar el agente mediante GitHub
-
Descargue el agente desde awlabs/ amazon-kinesis-agent
. -
Instale el agente. Para ello, diríjase al directorio de descargas y ejecute el siguiente comando:
sudo ./setup --install
Configuración del agente en un contenedor de Docker
El agente de Kinesis también puede ejecutarse en un contenedor además de a través de la base de contenedores amazonlinux. Utilice el siguiente Dockerfile y ejecute docker build
.
FROM amazonlinux RUN yum install -y aws-kinesis-agent which findutils COPY agent.json /etc/aws-kinesis/agent.json CMD ["start-aws-kinesis-agent"]
Configurar e iniciar el agente
Configuración e inicio del agente
-
Abra y edite el archivo de configuración (como superusuario si utiliza permisos de acceso de archivo predeterminado):
/etc/aws-kinesis/agent.json
En este archivo de configuración, especifique los archivos (
"filePattern"
) desde los que el agente deberá recopilar datos y el nombre de la secuencia ("kinesisStream"
) a la que deberá enviarlos. Tenga en cuenta que el nombre de archivo es un patrón y que el agente reconoce rotaciones de archivos. No puede rotar más de un archivo ni crear más de uno nuevo por segundo. El agente utiliza la marca temporal de creación de archivos para determinar a qué archivos realizarles seguimiento y poner en fila en la secuencia. Crear nuevos archivos o rotarlos más de una vez por segundo evita que el agente pueda diferenciarlos correctamente.{ "flows": [ { "filePattern": "
/tmp/app.log*
", "kinesisStream": "yourkinesisstream
" } ] } -
Comience el agente de forma manual:
sudo service aws-kinesis-agent start
-
Configure el agente para iniciarse al arrancar el sistema (opcional):
sudo chkconfig aws-kinesis-agent on
El agente ya está se ejecutando como un servicio de sistema en segundo plano. Monitoriza constantemente los archivos especificados y envía datos a la secuencia especificada. La auditoría de actividad se registra en /var/log/aws-kinesis-agent/aws-kinesis-agent.log
.
Especificar las opciones de configuración del agente
El agente admite las dos opciones de configuración obligatorias, filePattern
y kinesisStream
, además de configuraciones opcionales para activar características adicionales. Las opciones de configuración obligatorias y opcionales se especifican en /etc/aws-kinesis/agent.json
.
Cada vez que cambie el archivo de configuración, debe detener y comenzar el agente con los siguientes comandos:
sudo service aws-kinesis-agent stop sudo service aws-kinesis-agent start
También puede hacerlo con el siguiente comando:
sudo service aws-kinesis-agent restart
Las opciones de configuración generales son las siguientes.
Opción de configuración | Descripción |
---|---|
assumeRoleARN |
El ARN del rol que asumirá el usuario. Para obtener más información, consulte Delegar el acceso a todas AWS las cuentas mediante funciones de IAM en la Guía del usuario de IAM. |
assumeRoleExternalId |
Un identificador opcional que determina quién puede asumir el rol. Para obtener más información, consulte Cómo utilizar un ID externo en la Guía del usuario de IAM. |
awsAccessKeyId |
AWS ID de clave de acceso que anula las credenciales predeterminadas. Este ajuste tiene prioridad sobre los demás proveedores de credenciales. |
awsSecretAccessKey |
AWS clave secreta que anula las credenciales predeterminadas. Este ajuste tiene prioridad sobre los demás proveedores de credenciales. |
cloudwatch.emitMetrics |
Permite que el agente emita métricas CloudWatch si se ha establecido (true). Predeterminado: true |
cloudwatch.endpoint |
El punto final regional de CloudWatch. Valor predeterminado: |
kinesis.endpoint |
El punto de conexión regional para Kinesis Data Streams. Valor predeterminado: |
Las opciones de configuración de flujo son las siguientes.
Opción de configuración | Descripción |
---|---|
dataProcessingOptions |
La lista de opciones de procesamiento aplicadas a cada registro analizado antes de enviarlo a la secuencia. Las opciones de procesamiento se realizan en el orden especificado. Para obtener más información, consulte Usar el agente para preprocesar el agente. |
kinesisStream |
[Obligatorio] El nombre de la secuencia. |
filePattern |
[Obligatorio] El patrón del directorio y del archivo que debe coincidir para que el agente lo capte. Se debe conceder permisos de lectura a |
initialPosition |
La posición inicial desde la que el archivo comenzó a ser analizado. Los valores válidos son Valor predeterminado: |
maxBufferAgeMillis |
El tiempo máximo, en milisegundos, durante el cual el agente almacena los datos en búfer antes de enviarlos a la secuencia. Intervalo de valores: 1000 a 900 000 (1 segundo a 15 minutos) Valor predeterminado: 60 000 (1 minuto) |
maxBufferSizeBytes |
La cantidad de datos en bytes que el agente almacena en búfer antes de enviarlos a la secuencia. Intervalo de valores: 1 a 4 194 304 (4 MB) Valor predeterminado: 4 194 304 (4 MB) |
maxBufferSizeRecords |
La cantidad máxima de registros en datos que el agente almacena en búfer antes de enviarlos a la secuencia. Intervalo de valores: 1 a 500 Predeterminado: 500 |
minTimeBetweenFilePollsMillis |
El intervalo de tiempo, en milisegundos, en el que el agente sondea y analiza los archivos monitorizados para identificar datos nuevos. Intervalo de valores: 1 o más Predeterminado: 100 |
multiLineStartPattern |
El patrón para identificar el comienzo de un registro. Un registro consta de una línea que coincide con el patrón y de líneas siguientes que no coinciden con el patrón. Los valores válidos son expresiones regulares. De forma predeterminada, cada línea en los archivos de registro se analiza como un registro. |
partitionKeyOption |
El método para generar la clave de partición. Los valores válidos son Valor predeterminado: |
skipHeaderLines |
La cantidad de líneas de los archivos monitorizados, a partir de la primera, que el agente debe omitir en el momento de analizarlos. Intervalo de valores: 0 o más Cantidad predeterminada: 0 (cero) |
truncatedRecordTerminator |
La cadena que utiliza el agente para truncar un registro analizado cuando su tamaño supera el límite de tamaño de registros de Kinesis Data Streams. (1000 KB) Valor predeterminado: |
Monitorear varios directorios de archivos y escribir en varias secuencias
Puede configurar el agente para que monitorice varios directorios de archivos y envíe datos a varias secuencias especificando varias opciones de configuración de secuencia. En el siguiente ejemplo de configuración, el agente supervisa dos directorios de archivos y envía datos a un flujo de Kinesis y a un flujo de entrega de Firehose, respectivamente. Tenga en cuenta que puede especificar diferentes puntos de conexión para Kinesis Data Streams y Firehose para que su flujo de Kinesis y su flujo de entrega de Firehose no tengan que estar en la misma región.
{ "cloudwatch.emitMetrics":
true
, "kinesis.endpoint": "https://your/kinesis/endpoint
", "firehose.endpoint": "https://your/firehose/endpoint
", "flows": [ { "filePattern": "/tmp/app1.log*
", "kinesisStream": "yourkinesisstream
" }, { "filePattern": "/tmp/app2.log*
", "deliveryStream": "yourfirehosedeliverystream
" } ] }
Para más información sobre el uso del agente con Firehose, consulte Escritura en Amazon Data Firehose con el agente de Kinesis.
Usar el agente para preprocesar el agente
El agente puede preprocesar los registros previamente analizados de los archivos monitorizados antes de enviarlos a la secuencia. Para habilitar esta característica, añada la opción de configuración dataProcessingOptions
al flujo de archivos. Se pueden agregar una o más opciones de procesamiento, que se ejecutarán en el orden especificado.
El agente es compatible con las siguientes opciones de procesamiento. Al ser de código abierto, puede desarrollar y ampliar las opciones de procesamiento del agente. Puede descargar el agente desde el agente de Kinesis
Opciones de procesamiento
SINGLELINE
-
Elimina caracteres de nueva línea y espacios al principio y al final de las líneas para convertir un registro multilínea en un registro de una sola línea.
{ "optionName": "SINGLELINE" }
CSVTOJSON
-
Convierte un registro de formato con separaciones mediante delimitadores en un registro de formato JSON.
{ "optionName": "CSVTOJSON", "customFieldNames": [ "
field1
", "field2
",...
], "delimiter": "yourdelimiter
" }customFieldNames
-
[Obligatorio] Los nombres de campos utilizados como claves en cada par de valores de clave JSON. Por ejemplo, si especifica
["f1", "f2"]
, el registro "v1, v2" se convertirá en{"f1":"v1","f2":"v2"}
. delimiter
-
La cadena utilizada como delimitador en el registro. El valor predeterminado es una coma (,).
LOGTOJSON
-
Convierte un registro con un formato de registro en un registro con formato JSON. Los formatos de registro admitidos son Apache Common Log, Apache Combined Log, Apache Error Log y RFC3164 Syslog.
{ "optionName": "LOGTOJSON", "logFormat": "
logformat
", "matchPattern": "yourregexpattern
", "customFieldNames": [ "field1
", "field2
",…
] }logFormat
-
[Obligatorio] El formato de entrada del registro. Los valores posibles son los siguientes:
-
COMMONAPACHELOG
: formato común de registro de Apache. Cada entrada de registro sigue el siguiente patrón de forma predeterminada: "%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes}
". -
COMBINEDAPACHELOG
: formato combinado de registro de Apache. Cada entrada de registro sigue el siguiente patrón de forma predeterminada: "%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes} %{referrer} %{agent}
". -
APACHEERRORLOG
: formato de registro de errores de Apache. Cada entrada de registro sigue el siguiente patrón de forma predeterminada: "[%{timestamp}] [%{module}:%{severity}] [pid %{processid}:tid %{threadid}] [client: %{client}] %{message}
". -
SYSLOG
— El formato RFC3164 Syslog. Cada entrada de registro sigue el siguiente patrón de forma predeterminada: "%{timestamp} %{hostname} %{program}[%{processid}]: %{message}
".
-
matchPattern
-
El patrón de expresiones regulares utilizado para extraer valores de entradas de registro. Este ajuste se utiliza si la entrada de registro no tiene uno de los formatos de registro predefinidos. Si se utiliza este ajuste, también tendrá que especificar
customFieldNames
. customFieldNames
-
Los nombres de campos utilizados como claves en cada par de valores de clave JSON. Utilice esta opción para definir nombres de campos para valores extraídos de
matchPattern
, o sobrescriba los nombres de campos de los formatos de logs predefinidos.
ejemplo : Configuración LOGTOJSON
Este es un ejemplo de configuración LOGTOJSON
de una entrada de registro en Formato común de registro de Apache convertida a formato JSON:
{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" }
Antes de la conversión:
64.242.88.10 - - [07/Mar/2004:16:10:02 -0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291
Después de la conversión:
{"host":"64.242.88.10","ident":null,"authuser":null,"datetime":"07/Mar/2004:16:10:02 -0800","request":"GET /mailman/listinfo/hsdivision HTTP/1.1","response":"200","bytes":"6291"}
ejemplo : Configuración LOGTOJSON con campos personalizados
Este es otro ejemplo de configuración LOGTOJSON
:
{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "customFieldNames": ["f1", "f2", "f3", "f4", "f5", "f6", "f7"] }
Con esta configuración, la misma entrada de registro con Formato común de registro de Apache del ejemplo anterior se convierte a formato JSON de la siguiente manera:
{"f1":"64.242.88.10","f2":null,"f3":null,"f4":"07/Mar/2004:16:10:02 -0800","f5":"GET /mailman/listinfo/hsdivision HTTP/1.1","f6":"200","f7":"6291"}
ejemplo : Convertir una entrada de registro con Formato común de registro de Apache
La siguiente configuración de flujo convierte la entrada de registro con Formato común de registro de Apache en un registro de una línea en formato JSON:
{ "flows": [ { "filePattern": "
/tmp/app.log*
", "kinesisStream": "my-stream
", "dataProcessingOptions": [ { "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" } ] } ] }
ejemplo : Convertir registros multilínea
La siguiente configuración de flujo analiza aquellos registros multilínea cuya primera línea comience por "[SEQUENCE=
". Primero, el registro se convierte en un registro de una línea. Después, se extraen los valores del registro basándose en tabulaciones delimitadoras. Finalmente, los valores extraídos se asignan a valores customFieldNames
específicos para formar un registro de una línea en formato JSON.
{ "flows": [ { "filePattern": "
/tmp/app.log*
", "kinesisStream": "my-stream
", "multiLineStartPattern": "\\[SEQUENCE=
", "dataProcessingOptions": [ { "optionName": "SINGLELINE" }, { "optionName": "CSVTOJSON", "customFieldNames": [ "field1
", "field2
", "field3
" ], "delimiter": "\\t
" } ] } ] }
ejemplo : Configuración LOGTOJSON con patrón de coincidencia
Este es un ejemplo de una configuración de entrada de registro con Formato común de registro de Apache LOGTOJSON
convertida a formato JSON con el último campo (bytes) omitido:
{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "matchPattern": "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3})", "customFieldNames": ["host", "ident", "authuser", "datetime", "request", "response"] }
Antes de la conversión:
123.45.67.89 - - [27/Oct/2000:09:27:09 -0400] "GET /java/javaResources.html HTTP/1.0" 200
Después de la conversión:
{"host":"123.45.67.89","ident":null,"authuser":null,"datetime":"27/Oct/2000:09:27:09 -0400","request":"GET /java/javaResources.html HTTP/1.0","response":"200"}
Usar los comandos de la CLI
Iniciar automáticamente al agente al arrancar el sistema:
sudo chkconfig aws-kinesis-agent on
Compruebe el estado del agente:
sudo service aws-kinesis-agent status
Detener el agente:
sudo service aws-kinesis-agent stop
Leer el archivo de registro del agente desde esta ubicación:
/var/log/aws-kinesis-agent/aws-kinesis-agent.log
Desinstalar el agente:
sudo yum remove aws-kinesis-agent
Preguntas frecuentes
¿Hay un agente de Kinesis para Windows?
El agente de Kinesis para Windows es un software diferente del agente de Kinesis para plataformas Linux.
¿Por qué se ralentiza el agente de Kinesis o aumenta el valor de RecordSendErrors
?
Normalmente esto se debe a la limitación de Kinesis. Compruebe la métrica WriteProvisionedThroughputExceeded
para Kinesis Data Streams o la métrica ThrottledRecords
para flujos de entrega de Firehose. Cualquier aumento a partir de 0 en estas métricas indica que es necesario aumentar los límites de flujo. Para obtener más información, consulte Límites de flujos de datos de Kinesis y flujos de entrega de Amazon Firehose.
Una vez que descarte la limitación de velocidad, compruebe si el agente de Kinesis está configurado para enviar a la cola una gran cantidad de archivos pequeños. Se produce un retraso cuando el agente de Kinesis sigue un archivo nuevo, por lo que el agente de Kinesis debería seguir una pequeña cantidad de archivos de mayor tamaño. Intente consolidar los archivos de registro en archivos más grandes.
¿Por qué se producen excepciones java.lang.OutOfMemoryError
?
El agente de Kinesis no tiene memoria suficiente para gestionar la carga de trabajo actual. Intente aumentar JAVA_START_HEAP
y JAVA_MAX_HEAP
en /usr/bin/start-aws-kinesis-agent
y reinicie el agente.
¿Por qué se producen excepciones IllegalStateException : connection pool shut down
?
El agente de Kinesis no tiene suficientes conexiones para gestionar la carga de trabajo actual. Intente aumentar maxConnections
y maxSendingThreads
en los ajustes generales de configuración del agente en /etc/aws-kinesis/agent.json
. El valor predeterminado para estos campos es 12 veces los procesadores de tiempo de ejecución disponibles. Consulte AgentConfiguration.java
¿Cómo puedo depurar otro problema con el agente de Kinesis?
Los registros de nivel DEBUG
pueden habilitarse en /etc/aws-kinesis/log4j.xml
.
¿Cómo debo configurar el agente de Kinesis?
Cuanto menor sea el valor de maxBufferSizeBytes
, más frecuentemente enviará datos el agente de Kinesis. Esto puede ser bueno ya que disminuye el tiempo de entrega de los registros, pero también aumenta las solicitudes por segundo a Kinesis.
¿Por qué el agente de Kinesis envía registros duplicados?
Esto ocurre debido a una mala configuración en el seguimiento de archivos. Asegúrese de que cada fileFlow’s filePattern
solo coincida con un archivo. Esto también puede ocurrir si el modo logrotate
que se está utilizando está en modo copytruncate
. Intente cambiar al modo predeterminado o al de creación para evitar la duplicación. Para obtener más información sobre la gestión de registros duplicados, consulte Handling Duplicate Records.