Usando um pipeline OpenSearch de ingestão com o Amazon S3 - OpenSearch Serviço Amazon

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Usando um pipeline OpenSearch de ingestão com o Amazon S3

Com OpenSearch a ingestão, você pode usar o Amazon S3 como origem ou destino. Ao usar o Amazon S3 como fonte, você envia dados para um pipeline de OpenSearch ingestão. Ao usar o Amazon S3 como destino, você grava dados de um pipeline de OpenSearch ingestão em um ou mais buckets do S3.

Amazon S3 como origem

Há duas maneiras de usar o Amazon S3 como fonte para processar dados: com o processamento do S3 e com escaneamentos programados. SQS

Use o SQS processamento S3 quando precisar de uma varredura quase em tempo real dos arquivos depois que eles forem gravados no S3. Você pode configurar buckets do Amazon S3 para gerar um evento sempre que um objeto for armazenado ou modificado dentro do bucket. Use uma verificação agendada única ou recorrente para processar dados em lote em um bucket do S3.

Pré-requisitos

Para usar o Amazon S3 como fonte de um pipeline de OpenSearch ingestão para uma verificação programada ou SQS processamento do S3, primeiro crie um bucket do S3.

nota

Se o bucket do S3 usado como fonte no pipeline de OpenSearch ingestão estiver em outro Conta da AWS, você também precisará habilitar as permissões de leitura entre contas no bucket. Isso permite que o pipeline leia e processe os dados. Para habilitar permissões entre contas, consulte Bucket owner granting cross-account bucket permissions (Conceder permissões de bucket entre contas como proprietário do bucket) no Guia do usuário do Amazon S3.

Se seus buckets do S3 estiverem em várias contas, use um bucket_owners mapa. Para ver um exemplo, consulte Acesso ao S3 entre contas na OpenSearch documentação.

Para configurar o SQS processamento do S3, você também precisa executar as seguintes etapas:

  1. Crie uma SQS fila da Amazon.

  2. Ative as notificações de eventos no bucket do S3 com a SQS fila como destino.

Etapa 1: configurar a função do pipeline

Ao contrário de outros plug-ins de origem que enviam dados para um pipeline, o plug-in de origem do S3 tem uma arquitetura baseada em leitura na qual o pipeline extrai dados da fonte.

Portanto, para que um pipeline seja lido do S3, você deve especificar uma função na configuração de origem do S3 do pipeline que tenha acesso ao bucket do S3 e à fila da Amazon. SQS O pipeline assumirá essa função para ler os dados da fila.

nota

A função que você especifica na configuração de origem do S3 deve ser a função do pipeline. Portanto, sua função de pipeline deve conter duas políticas de permissões separadas: uma para gravar em um coletor e outra para extrair da origem do S3. Você deve usar o mesmo sts_role_arn em todos os componentes do pipeline.

O exemplo de política a seguir mostra as permissões necessárias para usar o S3 como fonte:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action":[ "s3:ListBucket", "s3:GetBucketLocation", "s3:GetObject" ], "Resource": "arn:aws:s3:::my-bucket/*" }, { "Effect":"Allow", "Action":"s3:ListAllMyBuckets", "Resource":"arn:aws:s3:::*" }, { "Effect": "Allow", "Action": [ "sqs:DeleteMessage", "sqs:ReceiveMessage", "sqs:ChangeMessageVisibility" ], "Resource": "arn:aws:sqs:us-west-2:{account-id}:MyS3EventSqsQueue" } ] }

Você deve anexar essas permissões à IAM função especificada na sts_role_arn opção na configuração do plug-in de origem do S3:

version: "2" source: s3: ... aws: ... sts_role_arn: arn:aws:iam::{account-id}:role/pipeline-role processor: ... sink: - opensearch: ...

Etapa 2: Criar o pipeline

Depois de configurar suas permissões, você pode configurar um pipeline de OpenSearch ingestão, dependendo do seu caso de uso do Amazon S3.

Processamento S3 SQS

Para configurar o SQS processamento do S3, configure seu pipeline para especificar o S3 como origem e configure as notificações da AmazonSQS:

version: "2" s3-pipeline: source: s3: notification_type: "sqs" codec: newline: null sqs: queue_url: "https://sqs.us-east-1.amazonaws.com/{account-id}/ingestion-queue" compression: "none" aws: region: "us-east-1" # IAM role that the pipeline assumes to read data from the queue. This role must be the same as the pipeline role. sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" processor: - grok: match: message: - "%{COMMONAPACHELOG}" - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint.us-east-1.es.amazonaws.com"] index: "index-name" aws: # IAM role that the pipeline assumes to access the domain sink sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" region: "us-east-1"

Se você observar baixa CPU utilização ao processar arquivos pequenos no Amazon S3, considere aumentar a taxa de transferência modificando o valor da opção. workers Para obter mais informações, consulte as opções de configuração do plug-in S3.

Varredura agendada

Para configurar uma verificação agendada, configure seu pipeline com uma programação no nível da verificação que se aplique a todos os seus buckets do S3 ou no nível de bucket. Uma programação em nível de bucket ou uma configuração de intervalo de escaneamento sempre substitui uma configuração em nível de escaneamento.

Você pode configurar escaneamentos agendados com um escaneamento único, que é ideal para migração de dados, ou um escaneamento recorrente, que é ideal para processamento em lote.

Para configurar seu pipeline para ler do Amazon S3, use os blueprints pré-configurados do Amazon S3. Você pode editar a parte da scan da configuração do seu pipeline para atender às suas necessidades de agendamento. Para obter mais informações, consulte Usar esquemas para criar um pipeline.

Digitalização única

Uma varredura agendada única é executada uma vez. Na sua YAML configuração, você pode usar um start_time e end_time para especificar quando deseja que os objetos no bucket sejam escaneados. Como alternativa, você pode usar range para especificar o intervalo de tempo em relação ao horário atual em que você deseja que os objetos no bucket sejam digitalizados.

Por exemplo, um intervalo definido para PT4H verificar todos os arquivos criados nas últimas quatro horas. Para configurar uma varredura única para ser executada pela segunda vez, você deve parar e reiniciar o pipeline. Se você não tiver um intervalo configurado, também deverá atualizar os horários de início e término.

A configuração a seguir configura uma varredura única para todos os buckets e todos os objetos nesses buckets:

version: "2" log-pipeline: source: s3: codec: csv: compression: "none" aws: region: "us-east-1" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" acknowledgments: true scan: buckets: - bucket: name: my-bucket-1 filter: include_prefix: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 key_prefix: include: - Objects2/ exclude_suffix: - .jpeg - .png delete_s3_objects_on_read: false processor: - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint.us-east-1.es.amazonaws.com"] index: "index-name" aws: sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" region: "us-east-1" dlq: s3: bucket: "my-bucket-1" region: "us-east-1" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role"

A configuração a seguir configura uma varredura única para todos os buckets durante uma janela de tempo especificada. Isso significa que o S3 processa somente os objetos com horários de criação que se enquadram nessa janela.

scan: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z buckets: - bucket: name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png

A configuração a seguir configura uma verificação única no nível de escaneamento e no nível do bucket. Os horários de início e término no nível do bucket substituem os horários de início e término no nível do escaneamento.

scan: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z buckets: - bucket: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png

A interrupção de uma tubulação remove qualquer referência preexistente de quais objetos foram escaneados pela tubulação antes da parada. Se um único pipeline de escaneamento for interrompido, ele digitalizará novamente todos os objetos após seu início, mesmo que eles já tenham sido escaneados. Se você precisar interromper um único pipeline de escaneamento, é recomendável alterar sua janela de tempo antes de iniciar o pipeline novamente.

Se você precisar filtrar objetos por hora de início e hora de término, parar e iniciar seu funil é a única opção. Se você não precisar filtrar por hora de início e hora de término, poderá filtrar objetos por nome. Filtrar por nome não exige que você pare e inicie seu funil. Para fazer isso, use include_prefix exclude_suffix e.

Escaneamento recorrente

Uma verificação agendada recorrente executa uma varredura de seus buckets S3 especificados em intervalos regulares e agendados. Você só pode configurar esses intervalos no nível de escaneamento porque não há suporte para configurações individuais em nível de bucket.

Na sua YAML configuração, o interval especifica a frequência da verificação recorrente e pode ser entre 30 segundos e 365 dias. A primeira dessas varreduras sempre ocorre quando você cria o pipeline. count define o número total de instâncias de verificação.

A configuração a seguir configura um escaneamento recorrente, com um atraso de 12 horas entre os escaneamentos:

scan: scheduling: interval: PT12H count: 4 buckets: - bucket: name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png

Amazon S3 como destino

Para gravar dados de um pipeline de OpenSearch ingestão em um bucket do S3, use o blueprint pré-configurado do S3 para criar um pipeline com um coletor do S3. Esse pipeline direciona dados seletivos para um OpenSearch coletor e envia simultaneamente todos os dados para arquivamento no S3. Para obter mais informações, consulte Usar esquemas para criar um pipeline.

Ao criar seu coletor S3, você pode especificar sua formatação preferida a partir de uma variedade de codecs de coletor. Por exemplo, se você quiser gravar dados em formato de coluna, escolha o codec Parquet ou Avro. Se você preferir um formato baseado em linhas, escolha JSON ou ND-. JSON Para gravar dados no S3 em um esquema especificado, você também pode definir um esquema embutido nos codecs de coletor usando o formato Avro.

O exemplo a seguir define um esquema embutido em um coletor do S3:

- s3: codec: parquet: schema: > { "type" : "record", "namespace" : "org.vpcFlowLog.examples", "name" : "VpcFlowLog", "fields" : [ { "name" : "version", "type" : "string"}, { "name" : "srcport", "type": "int"}, { "name" : "dstport", "type": "int"}, { "name" : "start", "type": "int"}, { "name" : "end", "type": "int"}, { "name" : "protocol", "type": "int"}, { "name" : "packets", "type": "int"}, { "name" : "bytes", "type": "int"}, { "name" : "action", "type": "string"}, { "name" : "logStatus", "type" : "string"} ] }

Ao definir esse esquema, especifique um superconjunto de todas as chaves que podem estar presentes nos diferentes tipos de eventos que seu pipeline entrega a um coletor.

Por exemplo, se um evento tiver a possibilidade de uma chave faltar, adicione essa chave em seu esquema com um valor null. Declarações de valor nulo permitem que o esquema processe dados não uniformes (onde alguns eventos têm essas chaves e outros não). Quando os eventos recebidos têm essas chaves presentes, seus valores são gravados em coletores.

Essa definição de esquema atua como um filtro que só permite que chaves definidas sejam enviadas aos coletores e elimina chaves indefinidas dos eventos recebidos.

Você também pode usar include_keys e exclude_keys no seu coletor para filtrar dados que são roteados para outros coletores. Esses dois filtros são mutuamente exclusivos, então você só pode usar um por vez em seu esquema. Além disso, não é possível usá-los em esquemas definidos pelo usuário.

Para criar pipelines com esses filtros, use o esquema de filtro de coletor pré-configurado. Para obter mais informações, consulte Usar esquemas para criar um pipeline.

Conta cruzada do Amazon S3 como fonte

Você pode conceder acesso a várias contas com o Amazon S3 para que os pipelines de OpenSearch ingestão possam acessar buckets do S3 em outra conta como fonte. Para habilitar o acesso entre contas, consulte Proprietário do bucket concedendo permissões de bucket entre contas no Guia do usuário do Amazon S3. Depois de conceder acesso, certifique-se de que sua função no pipeline tenha as permissões necessárias.

Em seguida, você pode criar uma YAML configuração usando bucket_owners para habilitar o acesso entre contas a um bucket do Amazon S3 como fonte:

s3-pipeline: source: s3: notification_type: "sqs" codec: csv: delimiter: "," quote_character: "\"" detect_header: True sqs: queue_url: "https://sqs.ap-northeast-1.amazonaws.com/401447383613/test-s3-queue" bucket_owners: my-bucket-01: 123456789012 my-bucket-02: 999999999999 compression: "gzip"