Uso de una canalización OpenSearch de ingestión con Amazon S3 - OpenSearch Servicio Amazon

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Uso de una canalización OpenSearch de ingestión con Amazon S3

Con OpenSearch Ingestion, puede utilizar Amazon S3 como origen o destino. Cuando utiliza Amazon S3 como fuente, envía datos a una canalización de OpenSearch ingestión. Cuando utiliza Amazon S3 como destino, escribe datos de una canalización de OpenSearch ingestión en uno o más buckets de S3.

Amazon S3 como origen

Existen dos formas de utilizar Amazon S3 como fuente para procesar datos: con el SQSprocesamiento S3 y con escaneos programados.

Utilice el SQS procesamiento S3 cuando necesite escanear los archivos casi en tiempo real una vez que se hayan escrito en S3. Puede configurar los buckets de Amazon S3 para que generen un evento cada vez que se almacene o modifique un objeto en el bucket. Utilice un escaneo programado único o recurrente para procesar por lotes los datos de un bucket de S3.

Requisitos previos

Para usar Amazon S3 como fuente de una canalización de OpenSearch ingestión tanto para un escaneo programado como para un SQS procesamiento S3, cree primero un bucket de S3.

nota

Si el bucket de S3 utilizado como fuente en la canalización de OpenSearch ingestión se encuentra en un bucket diferente Cuenta de AWS, también debes habilitar los permisos de lectura entre cuentas en el bucket. Esto permite que la canalización lea y procese los datos. Para habilitar los permisos entre cuentas, consulte Propietario del bucket que concede permisos de bucket entre cuentas en la Guía del usuario de Amazon S3.

Si sus buckets de S3 están en varias cuentas, utilice una asignación bucket_owners. Para ver un ejemplo, consulta el acceso a S3 entre cuentas en la documentación. OpenSearch

Para configurar el SQS procesamiento S3, también debe realizar los siguientes pasos:

  1. Crea una SQS cola de Amazon.

  2. Habilite las notificaciones de eventos en el bucket de S3 con la SQS cola como destino.

Paso 1: configurar el rol de canalización

A diferencia de otros complementos de origen que envían datos a una canalización, el complemento de fuente de S3 tiene una arquitectura basada en lectura en la que la canalización extrae datos de la fuente.

Por lo tanto, para que una canalización pueda leer desde S3, debes especificar un rol dentro de la configuración de origen de S3 de la canalización que tenga acceso tanto al bucket de S3 como a la SQS cola de Amazon. La canalización asumirá este rol para leer los datos de la cola.

nota

El rol que especifique en la configuración de fuente de S3 debe ser el rol de canalización. Por lo tanto, su rol de canalización debe contener dos políticas de permisos independientes: una para escribir en un receptor y otra para extraer de la fuente de S3. Debe usar el mismo sts_role_arn en todos los componentes de la canalización.

El siguiente ejemplo de política muestra los permisos necesarios para usar S3 como fuente:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action":[ "s3:ListBucket", "s3:GetBucketLocation", "s3:GetObject" ], "Resource": "arn:aws:s3:::my-bucket/*" }, { "Effect":"Allow", "Action":"s3:ListAllMyBuckets", "Resource":"arn:aws:s3:::*" }, { "Effect": "Allow", "Action": [ "sqs:DeleteMessage", "sqs:ReceiveMessage", "sqs:ChangeMessageVisibility" ], "Resource": "arn:aws:sqs:us-west-2:{account-id}:MyS3EventSqsQueue" } ] }

Debes adjuntar estos permisos al IAM rol que especifiques en la sts_role_arn opción de la configuración del complemento fuente de S3:

version: "2" source: s3: ... aws: ... sts_role_arn: arn:aws:iam::{account-id}:role/pipeline-role processor: ... sink: - opensearch: ...

Paso 2: crear la canalización

Una vez configurados los permisos, puede configurar una canalización de OpenSearch ingestión en función del caso de uso de Amazon S3.

S3: procesamiento SQS

Para configurar el SQS procesamiento de S3, configura tu canalización para especificar S3 como origen y configura SQS las notificaciones de Amazon:

version: "2" s3-pipeline: source: s3: notification_type: "sqs" codec: newline: null sqs: queue_url: "https://sqs.us-east-1.amazonaws.com/{account-id}/ingestion-queue" compression: "none" aws: region: "us-east-1" # IAM role that the pipeline assumes to read data from the queue. This role must be the same as the pipeline role. sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" processor: - grok: match: message: - "%{COMMONAPACHELOG}" - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint.us-east-1.es.amazonaws.com"] index: "index-name" aws: # IAM role that the pipeline assumes to access the domain sink sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" region: "us-east-1"

Si observa un bajo nivel de CPU utilización al procesar archivos pequeños en Amazon S3, considere la posibilidad de aumentar el rendimiento modificando el valor de la workers opción. Para obtener más información, consulte S3 plugin configuration options.

Escaneo programado

Para configurar un escaneo programado, configure su canalización con un cronograma a nivel de escaneo que se aplique a todos los buckets de S3 o a nivel de bucket. Una programación a nivel de bucket o una configuración de intervalos de escaneo siempre sobrescribe una configuración a nivel de escaneo.

Puede configurar los escaneos programados con un escaneo único, que es ideal para la migración de datos, o un escaneo periódico, que es ideal para el procesamiento por lotes.

Para configurar su canalización para que lea desde Amazon S3, utilice los esquemas de Amazon S3 preconfigurados. Puede editar la parte scan de la configuración de la canalización para adaptarla a sus necesidades de programación. Para más información, consulte Uso de esquemas para crear una canalización.

Escaneo único

Un escaneo único programado se ejecuta una sola vez. En su YAML configuración, puede usar un start_time y end_time para especificar cuándo quiere que se escaneen los objetos del depósito. O bien, puede usar un range para especificar el intervalo de tiempo en relación con la hora actual en el que desea que se escaneen los objetos del bucket.

Por ejemplo, un rango configurado para PT4H escanea todos los archivos creados en las últimas cuatro horas. Para configurar un escaneo único para que se ejecute por segunda vez, debe detener y reiniciar la canalización. Si no tiene un rango configurado, también debe actualizar las horas de inicio y finalización.

La siguiente configuración establece un escaneo único de todos los buckets y todos los objetos de esos buckets:

version: "2" log-pipeline: source: s3: codec: csv: compression: "none" aws: region: "us-east-1" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" acknowledgments: true scan: buckets: - bucket: name: my-bucket-1 filter: include_prefix: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 key_prefix: include: - Objects2/ exclude_suffix: - .jpeg - .png delete_s3_objects_on_read: false processor: - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint.us-east-1.es.amazonaws.com"] index: "index-name" aws: sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" region: "us-east-1" dlq: s3: bucket: "my-bucket-1" region: "us-east-1" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role"

La siguiente configuración establece un escaneo único de todos los buckets durante un período de tiempo específico. Esto significa que S3 procesa solo los objetos con tiempos de creación que se encuentran dentro de este período.

scan: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z buckets: - bucket: name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png

La siguiente configuración establece un escaneo único tanto a nivel de escaneo como a nivel del bucket. Las horas de inicio y finalización a nivel del bucket anulan las horas de inicio y finalización a nivel de escaneo.

scan: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z buckets: - bucket: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png

Al detener una canalización, se elimina cualquier referencia preexistente de los objetos que la canalización analizó antes de la detención. Si se detiene una sola canalización de análisis, volverá a analizar todos los objetos una vez iniciada, incluso si ya se analizaron. Si necesita detener una sola canalización de análisis, se recomienda cambiar el intervalo de tiempo antes de volver a iniciar la canalización.

Si necesita filtrar los objetos por hora de inicio y hora de finalización, la única opción es detener e iniciar la canalización. Si no necesita filtrar por hora de inicio y hora de finalización, puede filtrar los objetos por nombre. No es necesario detener e iniciar la canalización para filtrar por nombre. Para ello, use include_prefix y exclude_suffix.

Escaneo periódico

Un escaneo periódico programado ejecuta un escaneo de los buckets de S3 especificados a intervalos regulares y programados. Solo puede configurar estos intervalos a nivel de escaneo, ya que no se admiten configuraciones individuales a nivel de bucket.

En su YAML configuración, interval especifica la frecuencia del escaneo periódico y puede durar entre 30 segundos y 365 días. El primero de estos escaneos siempre se produce al crear la canalización. El count define el número total de instancias de escaneo.

La siguiente configuración establece un escaneo periódico, con un retraso de 12 horas entre los escaneos:

scan: scheduling: interval: PT12H count: 4 buckets: - bucket: name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png

Amazon S3 como destino

Para escribir datos de una canalización de OpenSearch ingestión en un depósito de S3, utilice el esquema preconfigurado de S3 para crear una canalización con un colector de S3. Esta canalización dirige los datos selectivos a un OpenSearch sumidero y, simultáneamente, envía todos los datos para archivarlos en S3. Para obtener más información, consulte Uso de esquemas para crear una canalización.

Al crear el receptor de S3, puede especificar el formato que prefiera entre una variedad de códecs de receptor. Por ejemplo, si desea escribir datos en formato de columnas, seleccione el códec Parquet o Avro. Si prefiere un formato basado en filas, elija ND JSON -. JSON Para escribir datos en S3 en un esquema específico, también puede definir un esquema insertado dentro de los códecs de receptor usando el formato Avro.

El siguiente ejemplo define un esquema insertado en un receptor de S3:

- s3: codec: parquet: schema: > { "type" : "record", "namespace" : "org.vpcFlowLog.examples", "name" : "VpcFlowLog", "fields" : [ { "name" : "version", "type" : "string"}, { "name" : "srcport", "type": "int"}, { "name" : "dstport", "type": "int"}, { "name" : "start", "type": "int"}, { "name" : "end", "type": "int"}, { "name" : "protocol", "type": "int"}, { "name" : "packets", "type": "int"}, { "name" : "bytes", "type": "int"}, { "name" : "action", "type": "string"}, { "name" : "logStatus", "type" : "string"} ] }

Cuando defina este esquema, especifique un superconjunto de todas las claves que pueden estar presentes en los distintos tipos de eventos que su canalización envía a un receptor.

Por ejemplo, si en un evento existe la posibilidad de que falte una clave, añada esa clave al esquema con un valor null. Las declaraciones de valores nulos permiten que el esquema procese datos no uniformes (algunos eventos tienen estas claves y otros no). Cuando los eventos entrantes tienen estas claves presentes, sus valores se escriben en los receptores.

Esta definición de esquema actúa como un filtro que solo permite enviar las claves definidas a los receptores y elimina las claves indefinidas de los eventos entrantes.

También puede usar include_keys y exclude_keys en su receptor para filtrar los datos que se dirigen a otros receptores. Estos dos filtros se excluyen mutuamente, por lo que solo puede usar uno a la vez en su esquema. Además, no puede utilizarlos en esquemas definidos por el usuario.

Para crear canalizaciones con dichos filtros, utilice el esquema de filtro de receptor preconfigurado. Para obtener más información, consulte Uso de esquemas para crear una canalización.

Varias cuentas de Amazon S3 como origen

Puede conceder acceso a todas las cuentas con Amazon S3 para que las canalizaciones de OpenSearch ingestión puedan acceder a los buckets de S3 de otra cuenta como fuente. Para habilitar los accesos entre cuentas, consulte Propietario del bucket que concede permisos de bucket entre cuentas en la Guía del usuario de Amazon S3. Una vez que haya concedido el acceso, asegúrese de que su rol en la canalización cuente con los permisos necesarios.

A continuación, puede crear una YAML configuración bucket_owners que permita el acceso multicuenta a un bucket de Amazon S3 como fuente:

s3-pipeline: source: s3: notification_type: "sqs" codec: csv: delimiter: "," quote_character: "\"" detect_header: True sqs: queue_url: "https://sqs.ap-northeast-1.amazonaws.com/401447383613/test-s3-queue" bucket_owners: my-bucket-01: 123456789012 my-bucket-02: 999999999999 compression: "gzip"