Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Escrivir en Amazon Kinesis Data Streams mediante el agente de Kinesis
El agente de Kinesis es una aplicación de software Java independiente que ofrece una forma sencilla de recopilar y enviar datos a Kinesis Data Streams. El agente monitoriza constantemente un conjunto de archivos y envía nuevos datos a su secuencia. El agente se encarga de rotar los archivos, crear puntos de restauración y realizar reintentos cuando se producen errores. El agente entrega todos los datos de manera confiable, puntual y sencilla. También emite métricas de Amazon CloudWatch para ayudarlo a supervisar y solucionar mejor los problemas que surjan en el proceso de streaming.
De forma predeterminada, los registros de cada archivo se analizan en función del carácter de nueva línea ('\n'
). Sin embargo, el agente también se puede configurar para analizar registros multilínea (consulte Especificar las opciones de configuración del agente).
Puede instalar el agente en entornos de servidor basados en Linux, como servidores web, de registro o de base de datos. Después de instalar el agente, configúrelo especificando los archivos que desee monitorizar y la secuencia de los datos. Una vez configurado, el agente recopila datos de los archivos de forma duradera y los envía de forma confiable a la secuencia.
Temas
- Cumplir con los requisitos previos para el agente de Kinesis
- Descargar e instalar el agente
- Configurar e iniciar el agente
- Especificar las opciones de configuración del agente
- Monitorear varios directorios de archivos y escribir en varias secuencias
- Usar el agente para preprocesar el agente
- Usar los comandos de la CLI
- Preguntas frecuentes
Cumplir con los requisitos previos para el agente de Kinesis
-
Su sistema operativo debe ser la versión 2015.09 o posterior de Amazon Linux AMI o la versión 7 o posterior de Red Hat Enterprise Linux.
-
Si utiliza Amazon EC2, lance la instancia de EC2 para ejecutar el agente.
-
Administre las credenciales de AWS utilizando uno de los siguientes métodos:
-
Especifique un rol de IAM al lanzar la instancia EC2.
-
Especifique las credenciales de AWS al configurar el agente (consulte awsAccessKeyId y awsSecretAccessKey).
-
Edite
/etc/sysconfig/aws-kinesis-agent
para especificar la región y las claves de acceso de AWS. -
Si la instancia de EC2 se encuentra en una cuenta de AWS diferente, cree un rol de IAM para proporcionar acceso al servicio Kinesis Data Streams y especifique dicho rol al configurar el agente (consulte assumeRoleARN y assumeRoleExternalId). Utilice uno de los métodos anteriores para especificar las credenciales de AWS de un usuario en la otra cuenta que tenga permiso para asumir este rol.
-
-
El rol de IAM o las credenciales de AWS que especifique deben tener permiso para realizar la operación PutRecords de Kinesis Data Streams para que el agente envíe datos a su flujo. Si habilita la supervisión de CloudWatch para el agente, también se necesita permiso para realizar la operación PutMetricData de CloudWatch. Para obtener más información, consulte Control del acceso a los recursos de Amazon Kinesis Data Streams mediante IAM, Supervise el estado de los agentes de Kinesis Data Streams con Amazon CloudWatch y Control de acceso de CloudWatch.
Descargar e instalar el agente
Primero, conéctese a la instancia. Para obtener más información, consulte Conexión a su instancia en la Guía del usuario de Amazon EC2. Si tiene problemas para conectarse, consulte Solución de problemas con la conexión a la instancia en la Guía del usuario de Amazon EC2.
Para configurar el agente a través de la AMI de Amazon Linux
Use el siguiente comando para descargar e instalar el agente:
sudo yum install –y aws-kinesis-agent
Para configurar el agente en Red Hat Enterprise Linux
Use el siguiente comando para descargar e instalar el agente:
sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn2.noarch.rpm
Para configurar el agente en GitHub
-
Descargue el agente de awlabs/amazon-kinesis-agent
. -
Instale el agente. Para ello, diríjase al directorio de descargas y ejecute el siguiente comando:
sudo ./setup --install
Configuración del agente en un contenedor de Docker
El agente de Kinesis también puede ejecutarse en un contenedor además de a través de la base de contenedores amazonlinux. Utilice el siguiente Dockerfile y ejecute docker build
.
FROM amazonlinux RUN yum install -y aws-kinesis-agent which findutils COPY agent.json /etc/aws-kinesis/agent.json CMD ["start-aws-kinesis-agent"]
Configurar e iniciar el agente
Configuración e inicio del agente
-
Abra y edite el archivo de configuración (como superusuario si utiliza permisos de acceso de archivo predeterminado):
/etc/aws-kinesis/agent.json
En este archivo de configuración, especifique los archivos (
"filePattern"
) desde los que el agente deberá recopilar datos y el nombre de la secuencia ("kinesisStream"
) a la que deberá enviarlos. Tenga en cuenta que el nombre de archivo es un patrón y que el agente reconoce rotaciones de archivos. No puede rotar más de un archivo ni crear más de uno nuevo por segundo. El agente utiliza la marca temporal de creación de archivos para determinar a qué archivos realizarles seguimiento y poner en fila en la secuencia. Crear nuevos archivos o rotarlos más de una vez por segundo evita que el agente pueda diferenciarlos correctamente.{ "flows": [ { "filePattern": "
/tmp/app.log*
", "kinesisStream": "yourkinesisstream
" } ] } -
Comience el agente de forma manual:
sudo service aws-kinesis-agent start
-
Configure el agente para iniciarse al arrancar el sistema (opcional):
sudo chkconfig aws-kinesis-agent on
El agente ya está se ejecutando como un servicio de sistema en segundo plano. Monitoriza constantemente los archivos especificados y envía datos a la secuencia especificada. La auditoría de actividad se registra en /var/log/aws-kinesis-agent/aws-kinesis-agent.log
.
Especificar las opciones de configuración del agente
El agente admite las dos opciones de configuración obligatorias, filePattern
y kinesisStream
, además de configuraciones opcionales para activar características adicionales. Las opciones de configuración obligatorias y opcionales se especifican en /etc/aws-kinesis/agent.json
.
Cada vez que cambie el archivo de configuración, debe detener y comenzar el agente con los siguientes comandos:
sudo service aws-kinesis-agent stop sudo service aws-kinesis-agent start
También puede hacerlo con el siguiente comando:
sudo service aws-kinesis-agent restart
Las opciones de configuración generales son las siguientes.
Opción de configuración | Descripción |
---|---|
assumeRoleARN |
El ARN del rol que asumirá el usuario. Para obtener más información, consulte Delegar el acceso entre cuentas de AWS mediante roles de IAM en la Guía del usuario de IAM. |
assumeRoleExternalId |
Un identificador opcional que determina quién puede asumir el rol. Para obtener más información, consulte Cómo utilizar un ID externo en la Guía del usuario de IAM. |
awsAccessKeyId |
ID de clave de acceso de AWS que anula las credenciales predeterminadas. Este ajuste tiene prioridad sobre los demás proveedores de credenciales. |
awsSecretAccessKey |
Clave de secreta de AWS que anula las credenciales predeterminadas. Este ajuste tiene prioridad sobre los demás proveedores de credenciales. |
cloudwatch.emitMetrics |
Permite al agente emitir métricas a CloudWatch si se establece (true). Predeterminado: true |
cloudwatch.endpoint |
Punto de conexión regional de CloudWatch. Valor predeterminado: |
kinesis.endpoint |
El punto de conexión regional para Kinesis Data Streams. Valor predeterminado: |
Las opciones de configuración de flujo son las siguientes.
Opción de configuración | Descripción |
---|---|
dataProcessingOptions |
La lista de opciones de procesamiento aplicadas a cada registro analizado antes de enviarlo a la secuencia. Las opciones de procesamiento se realizan en el orden especificado. Para obtener más información, consulte Usar el agente para preprocesar el agente. |
kinesisStream |
[Obligatorio] El nombre de la secuencia. |
filePattern |
[Obligatorio] El patrón del directorio y del archivo que debe coincidir para que el agente lo capte. Se debe conceder permisos de lectura a |
initialPosition |
La posición inicial desde la que el archivo comenzó a ser analizado. Los valores válidos son Valor predeterminado: |
maxBufferAgeMillis |
El tiempo máximo, en milisegundos, durante el cual el agente almacena los datos en búfer antes de enviarlos a la secuencia. Intervalo de valores: 1000 a 900 000 (1 segundo a 15 minutos) Valor predeterminado: 60 000 (1 minuto) |
maxBufferSizeBytes |
La cantidad de datos en bytes que el agente almacena en búfer antes de enviarlos a la secuencia. Intervalo de valores: 1 a 4 194 304 (4 MB) Valor predeterminado: 4 194 304 (4 MB) |
maxBufferSizeRecords |
La cantidad máxima de registros en datos que el agente almacena en búfer antes de enviarlos a la secuencia. Intervalo de valores: 1 a 500 Predeterminado: 500 |
minTimeBetweenFilePollsMillis |
El intervalo de tiempo, en milisegundos, en el que el agente sondea y analiza los archivos monitorizados para identificar datos nuevos. Intervalo de valores: 1 o más Predeterminado: 100 |
multiLineStartPattern |
El patrón para identificar el comienzo de un registro. Un registro consta de una línea que coincide con el patrón y de líneas siguientes que no coinciden con el patrón. Los valores válidos son expresiones regulares. De forma predeterminada, cada línea en los archivos de registro se analiza como un registro. |
partitionKeyOption |
El método para generar la clave de partición. Los valores válidos son Valor predeterminado: |
skipHeaderLines |
La cantidad de líneas de los archivos monitorizados, a partir de la primera, que el agente debe omitir en el momento de analizarlos. Intervalo de valores: 0 o más Cantidad predeterminada: 0 (cero) |
truncatedRecordTerminator |
La cadena que utiliza el agente para truncar un registro analizado cuando su tamaño supera el límite de tamaño de registros de Kinesis Data Streams. (1000 KB) Valor predeterminado: |
Monitorear varios directorios de archivos y escribir en varias secuencias
Puede configurar el agente para que monitorice varios directorios de archivos y envíe datos a varias secuencias especificando varias opciones de configuración de secuencia. En el siguiente ejemplo de configuración, el agente supervisa dos directorios de archivos y envía datos a un flujo de Kinesis y a un flujo de entrega de Firehose, respectivamente. Tenga en cuenta que puede especificar diferentes puntos de conexión para Kinesis Data Streams y Firehose para que su flujo de Kinesis y su flujo de entrega de Firehose no tengan que estar en la misma región.
{ "cloudwatch.emitMetrics":
true
, "kinesis.endpoint": "https://your/kinesis/endpoint
", "firehose.endpoint": "https://your/firehose/endpoint
", "flows": [ { "filePattern": "/tmp/app1.log*
", "kinesisStream": "yourkinesisstream
" }, { "filePattern": "/tmp/app2.log*
", "deliveryStream": "yourfirehosedeliverystream
" } ] }
Para más información sobre el uso del agente con Firehose, consulte Escritura en Amazon Data Firehose con el agente de Kinesis.
Usar el agente para preprocesar el agente
El agente puede preprocesar los registros previamente analizados de los archivos monitorizados antes de enviarlos a la secuencia. Para habilitar esta característica, añada la opción de configuración dataProcessingOptions
al flujo de archivos. Se pueden agregar una o más opciones de procesamiento, que se ejecutarán en el orden especificado.
El agente es compatible con las siguientes opciones de procesamiento. Al ser de código abierto, puede desarrollar y ampliar las opciones de procesamiento del agente. Puede descargar el agente desde el agente de Kinesis
Opciones de procesamiento
SINGLELINE
-
Elimina caracteres de nueva línea y espacios al principio y al final de las líneas para convertir un registro multilínea en un registro de una sola línea.
{ "optionName": "SINGLELINE" }
CSVTOJSON
-
Convierte un registro de formato con separaciones mediante delimitadores en un registro de formato JSON.
{ "optionName": "CSVTOJSON", "customFieldNames": [ "
field1
", "field2
",...
], "delimiter": "yourdelimiter
" }customFieldNames
-
[Obligatorio] Los nombres de campos utilizados como claves en cada par de valores de clave JSON. Por ejemplo, si especifica
["f1", "f2"]
, el registro "v1, v2" se convertirá en{"f1":"v1","f2":"v2"}
. delimiter
-
La cadena utilizada como delimitador en el registro. El valor predeterminado es una coma (,).
LOGTOJSON
-
Convierte un registro con un formato de registro en un registro con formato JSON. Los formatos de registro admitidos son Apache Common Log, Apache Combined Log, Apache Error Log y RFC3164 Syslog.
{ "optionName": "LOGTOJSON", "logFormat": "
logformat
", "matchPattern": "yourregexpattern
", "customFieldNames": [ "field1
", "field2
",…
] }logFormat
-
[Obligatorio] El formato de entrada del registro. Los valores posibles son los siguientes:
-
COMMONAPACHELOG
: formato común de registro de Apache. Cada entrada de registro sigue el siguiente patrón de forma predeterminada: "%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes}
". -
COMBINEDAPACHELOG
: formato combinado de registro de Apache. Cada entrada de registro sigue el siguiente patrón de forma predeterminada: "%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes} %{referrer} %{agent}
". -
APACHEERRORLOG
: formato de registro de errores de Apache. Cada entrada de registro sigue el siguiente patrón de forma predeterminada: "[%{timestamp}] [%{module}:%{severity}] [pid %{processid}:tid %{threadid}] [client: %{client}] %{message}
". -
SYSLOG
: formato RFC3164 de Syslog. Cada entrada de registro sigue el siguiente patrón de forma predeterminada: "%{timestamp} %{hostname} %{program}[%{processid}]: %{message}
".
-
matchPattern
-
El patrón de expresiones regulares utilizado para extraer valores de entradas de registro. Este ajuste se utiliza si la entrada de registro no tiene uno de los formatos de registro predefinidos. Si se utiliza este ajuste, también tendrá que especificar
customFieldNames
. customFieldNames
-
Los nombres de campos utilizados como claves en cada par de valores de clave JSON. Utilice esta opción para definir nombres de campos para valores extraídos de
matchPattern
, o sobrescriba los nombres de campos de los formatos de logs predefinidos.
ejemplo : Configuración LOGTOJSON
Este es un ejemplo de configuración LOGTOJSON
de una entrada de registro en Formato común de registro de Apache convertida a formato JSON:
{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" }
Antes de la conversión:
64.242.88.10 - - [07/Mar/2004:16:10:02 -0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291
Después de la conversión:
{"host":"64.242.88.10","ident":null,"authuser":null,"datetime":"07/Mar/2004:16:10:02 -0800","request":"GET /mailman/listinfo/hsdivision HTTP/1.1","response":"200","bytes":"6291"}
ejemplo : Configuración LOGTOJSON con campos personalizados
Este es otro ejemplo de configuración LOGTOJSON
:
{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "customFieldNames": ["f1", "f2", "f3", "f4", "f5", "f6", "f7"] }
Con esta configuración, la misma entrada de registro con Formato común de registro de Apache del ejemplo anterior se convierte a formato JSON de la siguiente manera:
{"f1":"64.242.88.10","f2":null,"f3":null,"f4":"07/Mar/2004:16:10:02 -0800","f5":"GET /mailman/listinfo/hsdivision HTTP/1.1","f6":"200","f7":"6291"}
ejemplo : Convertir una entrada de registro con Formato común de registro de Apache
La siguiente configuración de flujo convierte la entrada de registro con Formato común de registro de Apache en un registro de una línea en formato JSON:
{ "flows": [ { "filePattern": "
/tmp/app.log*
", "kinesisStream": "my-stream
", "dataProcessingOptions": [ { "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" } ] } ] }
ejemplo : Convertir registros multilínea
La siguiente configuración de flujo analiza aquellos registros multilínea cuya primera línea comience por "[SEQUENCE=
". Primero, el registro se convierte en un registro de una línea. Después, se extraen los valores del registro basándose en tabulaciones delimitadoras. Finalmente, los valores extraídos se asignan a valores customFieldNames
específicos para formar un registro de una línea en formato JSON.
{ "flows": [ { "filePattern": "
/tmp/app.log*
", "kinesisStream": "my-stream
", "multiLineStartPattern": "\\[SEQUENCE=
", "dataProcessingOptions": [ { "optionName": "SINGLELINE" }, { "optionName": "CSVTOJSON", "customFieldNames": [ "field1
", "field2
", "field3
" ], "delimiter": "\\t
" } ] } ] }
ejemplo : Configuración LOGTOJSON con patrón de coincidencia
Este es un ejemplo de una configuración de entrada de registro con Formato común de registro de Apache LOGTOJSON
convertida a formato JSON con el último campo (bytes) omitido:
{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "matchPattern": "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3})", "customFieldNames": ["host", "ident", "authuser", "datetime", "request", "response"] }
Antes de la conversión:
123.45.67.89 - - [27/Oct/2000:09:27:09 -0400] "GET /java/javaResources.html HTTP/1.0" 200
Después de la conversión:
{"host":"123.45.67.89","ident":null,"authuser":null,"datetime":"27/Oct/2000:09:27:09 -0400","request":"GET /java/javaResources.html HTTP/1.0","response":"200"}
Usar los comandos de la CLI
Iniciar automáticamente al agente al arrancar el sistema:
sudo chkconfig aws-kinesis-agent on
Compruebe el estado del agente:
sudo service aws-kinesis-agent status
Detener el agente:
sudo service aws-kinesis-agent stop
Leer el archivo de registro del agente desde esta ubicación:
/var/log/aws-kinesis-agent/aws-kinesis-agent.log
Desinstalar el agente:
sudo yum remove aws-kinesis-agent
Preguntas frecuentes
¿Hay un agente de Kinesis para Windows?
El agente de Kinesis para Windows es un software diferente del agente de Kinesis para plataformas Linux.
¿Por qué se ralentiza el agente de Kinesis o aumenta el valor de RecordSendErrors
?
Normalmente esto se debe a la limitación de Kinesis. Compruebe la métrica WriteProvisionedThroughputExceeded
para Kinesis Data Streams o la métrica ThrottledRecords
para flujos de entrega de Firehose. Cualquier aumento a partir de 0 en estas métricas indica que es necesario aumentar los límites de flujo. Para obtener más información, consulte Límites de flujos de datos de Kinesis y flujos de entrega de Amazon Firehose.
Una vez que descarte la limitación de velocidad, compruebe si el agente de Kinesis está configurado para enviar a la cola una gran cantidad de archivos pequeños. Se produce un retraso cuando el agente de Kinesis sigue un archivo nuevo, por lo que el agente de Kinesis debería seguir una pequeña cantidad de archivos de mayor tamaño. Intente consolidar los archivos de registro en archivos más grandes.
¿Por qué se producen excepciones java.lang.OutOfMemoryError
?
El agente de Kinesis no tiene memoria suficiente para gestionar la carga de trabajo actual. Intente aumentar JAVA_START_HEAP
y JAVA_MAX_HEAP
en /usr/bin/start-aws-kinesis-agent
y reinicie el agente.
¿Por qué se producen excepciones IllegalStateException : connection pool shut down
?
El agente de Kinesis no tiene suficientes conexiones para gestionar la carga de trabajo actual. Intente aumentar maxConnections
y maxSendingThreads
en los ajustes generales de configuración del agente en /etc/aws-kinesis/agent.json
. El valor predeterminado para estos campos es 12 veces los procesadores de tiempo de ejecución disponibles. Consulte AgentConfiguration.java
¿Cómo puedo depurar otro problema con el agente de Kinesis?
Los registros de nivel DEBUG
pueden habilitarse en /etc/aws-kinesis/log4j.xml
.
¿Cómo debo configurar el agente de Kinesis?
Cuanto menor sea el valor de maxBufferSizeBytes
, más frecuentemente enviará datos el agente de Kinesis. Esto puede ser bueno ya que disminuye el tiempo de entrega de los registros, pero también aumenta las solicitudes por segundo a Kinesis.
¿Por qué el agente de Kinesis envía registros duplicados?
Esto ocurre debido a una mala configuración en el seguimiento de archivos. Asegúrese de que cada fileFlow’s filePattern
solo coincida con un archivo. Esto también puede ocurrir si el modo logrotate
que se está utilizando está en modo copytruncate
. Intente cambiar al modo predeterminado o al de creación para evitar la duplicación. Para obtener más información sobre la gestión de registros duplicados, consulte Handling Duplicate Records.