As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Usando um pipeline OpenSearch de ingestão com o Amazon S3
Com OpenSearch a ingestão, você pode usar o Amazon S3 como origem ou destino. Ao usar o Amazon S3 como fonte, você envia dados para um pipeline de OpenSearch ingestão. Ao usar o Amazon S3 como destino, você grava dados de um pipeline de OpenSearch ingestão em um ou mais buckets do S3.
Amazon S3 como origem
Há duas maneiras de usar o Amazon S3 como fonte para processar dados: com o processamento do S3 e com escaneamentos programados. SQS
Use o SQS processamento S3 quando precisar de uma varredura quase em tempo real dos arquivos depois que eles forem gravados no S3. Você pode configurar buckets do Amazon S3 para gerar um evento sempre que um objeto for armazenado ou modificado dentro do bucket. Use uma verificação agendada única ou recorrente para processar dados em lote em um bucket do S3.
Pré-requisitos
Para usar o Amazon S3 como fonte de um pipeline de OpenSearch ingestão para uma verificação programada ou SQS processamento do S3, primeiro crie um bucket do S3.
nota
Se o bucket do S3 usado como fonte no pipeline de OpenSearch ingestão estiver em outro Conta da AWS, você também precisará habilitar as permissões de leitura entre contas no bucket. Isso permite que o pipeline leia e processe os dados. Para habilitar permissões entre contas, consulte Bucket owner granting cross-account bucket permissions (Conceder permissões de bucket entre contas como proprietário do bucket) no Guia do usuário do Amazon S3.
Se seus buckets do S3 estiverem em várias contas, use um mapa bucket_owners
. Para ver um exemplo, consulte Acesso ao S3 entre contas
Para configurar o SQS processamento do S3, você também precisa executar as seguintes etapas:
-
Ative as notificações de eventos no bucket do S3 com a SQS fila como destino.
Etapa 1: configurar a função do pipeline
Ao contrário de outros plug-ins de origem que enviam dados para um pipeline, o plug-in de origem do S3
Portanto, para que um pipeline seja lido do S3, você deve especificar uma função na configuração de origem do S3 do pipeline que tenha acesso ao bucket do S3 e à fila da Amazon. SQS O pipeline assumirá essa função para ler os dados da fila.
nota
A função que você especifica na configuração de origem do S3 deve ser a função do pipeline. Portanto, sua função de pipeline deve conter duas políticas de permissões separadas: uma para gravar em um coletor e outra para extrair da origem do S3. Você deve usar o mesmo sts_role_arn
em todos os componentes do pipeline.
O exemplo de política a seguir mostra as permissões necessárias para usar o S3 como fonte:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action":[ "s3:ListBucket", "s3:GetBucketLocation", "s3:GetObject" ], "Resource": "arn:aws:s3:::
my-bucket
/*" }, { "Effect":"Allow", "Action":"s3:ListAllMyBuckets", "Resource":"arn:aws:s3:::*" }, { "Effect": "Allow", "Action": [ "sqs:DeleteMessage", "sqs:ReceiveMessage", "sqs:ChangeMessageVisibility" ], "Resource": "arn:aws:sqs:us-west-2
:{account-id}
:MyS3EventSqsQueue
" } ] }
Você deve anexar essas permissões à IAM função especificada na sts_role_arn
opção na configuração do plug-in de origem do S3:
version: "2" source: s3: ... aws: ... sts_role_arn: arn:aws:iam::
{account-id}
:role/pipeline-role
processor: ... sink: - opensearch: ...
Etapa 2: Criar o pipeline
Depois de configurar suas permissões, você pode configurar um pipeline de OpenSearch ingestão, dependendo do seu caso de uso do Amazon S3.
Processamento S3 SQS
Para configurar o SQS processamento do S3, configure seu pipeline para especificar o S3 como origem e configure as notificações da AmazonSQS:
version: "2" s3-pipeline: source: s3: notification_type: "sqs" codec: newline: null sqs: queue_url: "https://sqs.
us-east-1
.amazonaws.com/{account-id}
/ingestion-queue
" compression: "none" aws: region: "us-east-1
" # IAM role that the pipeline assumes to read data from the queue. This role must be the same as the pipeline role. sts_role_arn: "arn:aws:iam::{account-id}
:role/pipeline-role
" processor: - grok: match: message: - "%{COMMONAPACHELOG}" - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint
.us-east-1
.es.amazonaws.com"] index: "index-name
" aws: # IAM role that the pipeline assumes to access the domain sink sts_role_arn: "arn:aws:iam::{account-id}
:role/pipeline-role
" region: "us-east-1
"
Se você observar baixa CPU utilização ao processar arquivos pequenos no Amazon S3, considere aumentar a taxa de transferência modificando o valor da opção. workers
Para obter mais informações, consulte as opções de configuração do plug-in do S3
Varredura agendada
Para configurar uma verificação agendada, configure seu pipeline com uma programação no nível da verificação que se aplique a todos os seus buckets do S3 ou no nível de bucket. Uma programação em nível de bucket ou uma configuração de intervalo de escaneamento sempre substitui uma configuração em nível de escaneamento.
Você pode configurar escaneamentos agendados com um escaneamento único, que é ideal para migração de dados, ou um escaneamento recorrente, que é ideal para processamento em lote.
Para configurar seu pipeline para ler a partir Amazon S3, use os esquemas pré-configurados do Amazon S3. Você pode editar a parte da scan
da configuração do seu pipeline para atender às suas necessidades de agendamento. Para obter mais informações, consulte Usar esquemas para criar um pipeline.
Digitalização única
Uma varredura agendada única é executada uma vez. Na sua YAML configuração, você pode usar um start_time
e end_time
para especificar quando deseja que os objetos no bucket sejam escaneados. Como alternativa, você pode usar range
para especificar o intervalo de tempo em relação ao horário atual em que você deseja que os objetos no bucket sejam digitalizados.
Por exemplo, um intervalo definido para PT4H
verificar todos os arquivos criados nas últimas quatro horas. Para configurar uma varredura única para ser executada pela segunda vez, você deve parar e reiniciar o pipeline. Se você não tiver um intervalo configurado, também deverá atualizar os horários de início e término.
A configuração a seguir configura uma varredura única para todos os buckets e todos os objetos nesses buckets:
version: "2" log-pipeline: source: s3: codec: csv: compression: "none" aws: region: "
us-east-1
" sts_role_arn: "arn:aws:iam::{account-id
}:role/pipeline-role
" acknowledgments: true scan: buckets: - bucket: name:my-bucket-1
filter: include_prefix: -Objects1
/ exclude_suffix: - .jpeg - .png - bucket: name:my-bucket-2
key_prefix: include: -Objects2
/ exclude_suffix: - .jpeg - .png delete_s3_objects_on_read: false processor: - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint
.us-east-1
.es.amazonaws.com"] index: "index-name
" aws: sts_role_arn: "arn:aws:iam::{account-id
}:role/pipeline-role
" region: "us-east-1
" dlq: s3: bucket: "my-bucket-1
" region: "us-east-1
" sts_role_arn: "arn:aws:iam::{account-id
}:role/pipeline-role
"
A configuração a seguir configura uma varredura única para todos os buckets durante uma janela de tempo especificada. Isso significa que o S3 processa somente os objetos com horários de criação que se enquadram nessa janela.
scan: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z buckets: - bucket: name:
my-bucket-1
filter: include: -Objects1
/ exclude_suffix: - .jpeg - .png - bucket: name:my-bucket-2
filter: include: -Objects2
/ exclude_suffix: - .jpeg - .png
A configuração a seguir configura uma verificação única no nível de escaneamento e no nível do bucket. Os horários de início e término no nível do bucket substituem os horários de início e término no nível do escaneamento.
scan: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z buckets: - bucket: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z name:
my-bucket-1
filter: include: -Objects1
/ exclude_suffix: - .jpeg - .png - bucket: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z name:my-bucket-2
filter: include: -Objects2
/ exclude_suffix: - .jpeg - .png
A interrupção de um pipeline remove qualquer referência pré-existente de quais objetos foram verificados pelo pipeline antes da parada. Se uma única verificação de pipeline for interrompida, ele verificará novamente todos os objetos após seu início, mesmo que eles já tenham sido verificados. Se você precisar interromper uma única verificação de pipeline, é recomendável alterar sua janela de tempo antes de iniciar o pipeline novamente.
Se você precisar filtrar objetos por hora de início e hora de término, parar e iniciar seu pipeline é a única opção. Se você não precisar filtrar por hora de início e hora de término, poderá filtrar objetos por nome. Filtrar por nome não exige que você pare e inicie seu pipeline. Para fazer isso, use include_prefix
e exclude_suffix
.
Escaneamento recorrente
Uma verificação agendada recorrente executa uma varredura de seus buckets S3 especificados em intervalos regulares e agendados. Você só pode configurar esses intervalos no nível de escaneamento porque não há suporte para configurações individuais em nível de bucket.
Na sua YAML configuração, o interval
especifica a frequência da verificação recorrente e pode ser entre 30 segundos e 365 dias. A primeira dessas varreduras sempre ocorre quando você cria o pipeline. count
define o número total de instâncias de verificação.
A configuração a seguir configura um escaneamento recorrente, com um atraso de 12 horas entre os escaneamentos:
scan: scheduling: interval: PT12H count: 4 buckets: - bucket: name:
my-bucket-1
filter: include: -Objects1
/ exclude_suffix: - .jpeg - .png - bucket: name:my-bucket-2
filter: include: -Objects2
/ exclude_suffix: - .jpeg - .png
Amazon S3 como destino
Para gravar dados de um pipeline de OpenSearch ingestão em um bucket do S3, use o blueprint pré-configurado do S3 para criar um pipeline com um coletor do S3.
Ao criar seu coletor S3, você pode especificar sua formatação preferida a partir de uma variedade de codecs de coletor
O exemplo a seguir define um esquema embutido em um coletor do S3:
- s3: codec: parquet: schema: > { "type" : "record", "namespace" : "org.vpcFlowLog.examples", "name" : "VpcFlowLog", "fields" : [ { "name" : "version", "type" : "string"}, { "name" : "srcport", "type": "int"}, { "name" : "dstport", "type": "int"}, { "name" : "start", "type": "int"}, { "name" : "end", "type": "int"}, { "name" : "protocol", "type": "int"}, { "name" : "packets", "type": "int"}, { "name" : "bytes", "type": "int"}, { "name" : "action", "type": "string"}, { "name" : "logStatus", "type" : "string"} ] }
Ao definir esse esquema, especifique um superconjunto de todas as chaves que podem estar presentes nos diferentes tipos de eventos que seu pipeline entrega a um coletor.
Por exemplo, se um evento tiver a possibilidade de uma chave faltar, adicione essa chave em seu esquema com um valor null
. Declarações de valor nulo permitem que o esquema processe dados não uniformes (onde alguns eventos têm essas chaves e outros não). Quando os eventos recebidos têm essas chaves presentes, seus valores são gravados em coletores.
Essa definição de esquema atua como um filtro que só permite que chaves definidas sejam enviadas aos coletores e elimina chaves indefinidas dos eventos recebidos.
Você também pode usar include_keys
e exclude_keys
no seu coletor para filtrar dados que são roteados para outros coletores. Esses dois filtros são mutuamente exclusivos, então você só pode usar um por vez em seu esquema. Além disso, não é possível usá-los em esquemas definidos pelo usuário.
Para criar pipelines com esses filtros, use o esquema do filtro de coletor pré-configurado. Para obter mais informações, consulte Usar esquemas para criar um pipeline.
Amazon S3 entre contas como origem
Você pode conceder acesso a várias contas com o Amazon S3 para que os pipelines de OpenSearch ingestão possam acessar buckets do S3 em outra conta como fonte. Para habilitar o acesso entre contas, consulte Bucket owner granting cross-account bucket permissions (Conceder permissões de bucket entre contas como proprietário do bucket) no Guia do usuário do Amazon S3. Depois de conceder acesso, certifique-se de que seu perfil no pipeline tenha as permissões necessárias.
Em seguida, você pode criar uma YAML configuração usando bucket_owners
para habilitar o acesso entre contas a um bucket do Amazon S3 como fonte:
s3-pipeline: source: s3: notification_type: "sqs" codec: csv: delimiter: "," quote_character: "\"" detect_header: True sqs: queue_url: "https://sqs.ap-northeast-1.amazonaws.com/401447383613/test-s3-queue" bucket_owners: my-bucket-01: 123456789012 my-bucket-02: 999999999999 compression: "gzip"