Overview of pipeline features in Amazon OpenSearch Ingestion
Amazon OpenSearch Ingestion provisions pipelines, which consist of a source, a buffer, zero or more processors, and one or more sinks. Ingestion pipelines are powered by Data Prepper as the data engine. For an overview of the various components of a pipeline, see Key concepts.
The following sections provide an overview of some of the most commonly used features in Amazon OpenSearch Ingestion.
Note
This is not an exhaustive list of features that are available for pipelines. For
comprehensive documentation of all available pipeline functionality, see the Data
Prepper documentation
Persistent buffering
A persistent buffer stores your data in a disk-based buffer across multiple Availability Zones to add durability to your data. You can use persistent buffering to ingest data for all supported push-based sources without the need to set up a standalone buffer. These include HTTP and OpenTelemetry sources for logs, traces, and metrics.
To enable persistent buffering, choose Enable persistent buffer when creating or updating a pipeline. For more information, see Creating Amazon OpenSearch Ingestion pipelines. OpenSearch Ingestion automatically determines the required buffering capacity based on the Ingestion OpenSearch Compute Units (Ingestion OCUs) that you specify for the pipeline.
If you enable persistent buffering for a pipeline, the default maximum request payload size is 10 MB for HTTP sources, and 4 MB for OpenTelemetry sources. For HTTP sources, you can increase the maximum request payload size to 20 MB. The request payload size is the size of the entire HTTP request which typically contains multiple events. Any given event may not exceed 3.5 MB. If you don’t enable persistent buffering, you will not not be able to modify the maximum payload size to 20 MB.
For pipelines with persistent buffering enabled, the configured pipeline units are split between compute units and buffer units. If a pipeline is using a CPU intensive processor (grok, key value and/or split string) then the specified units are split in a 1:1 ratio of buffer to compute otherwise they are split in a 3:1 ratio. There is a bias towards provisioning more compute units with each of these ratios.
For example:
-
Pipeline with grok and 2 max units - 1 compute unit and 1 buffer units
-
Pipeline with grok and 5 max units - 3 compute units and 2 buffer units
-
Pipeline with no processors and 2 max units - 1 compute unit and 1 buffer units
-
Pipeline with no processors and 4 max units - 1 compute unit and 3 buffer units
-
Pipeline with grok and 5 max units - 2 compute units and 3 buffer units
By default, pipelines use an AWS owned key to encrypt buffer data. These pipelines don't need any additional permissions for the pipeline role. Alternately, you can specify a customer managed key and add the following IAM permissions to the pipeline role:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "KeyAccess", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:GenerateDataKeyWithoutPlaintext" ], "Resource": "arn:aws:kms:
{region}
:{aws-account-id}
:key/1234abcd-12ab-34cd-56ef-1234567890ab
" } ] }
For more information, see Customer managed keys in the AWS Key Management Service Developer Guide.
Note
If you disable persistent buffering, your pipeline will be updated to run entirely on in-memory buffering.
Provisioning persistent buffering
Amazon OpenSearch Ingestion tracks data written into a sink and automatically resumes writing from the last successful check point should there be an outage in the sink or other issues that prevents data from being successfully written. There are no additional services or components needed for persistent buffers other than minimum and maximum OpenSearch compute Units (OCU) set for the pipeline. When persistent buffering is turned on, each Ingestion OCU is now capable of providing persistent buffering along with its existing ability to ingest, transform, and route data. After you enable persistent buffering, data is retained in the buffer for 72 hours. Amazon OpenSearch Ingestion dynamically allocates the buffer from the minimum and maximum allocation of OCUs that you define for the pipelines.
The number of Ingestion OCUs used for persistent buffering is dynamically calculated based on the data source, the transformations on the streaming data, and the sink that the data is written to. Because a portion of the Ingestion OCUs now applies to persistent buffering, in order to maintain the same ingestion throughput for your pipeline, you need to increase the minimum and maximum Ingestion OCUs when turning on persistent buffering. This amount of OCUs that you need with persistent buffering depends on the source that you are ingesting data from and also on the type of processing that you are performing on the data. The following table shows the number of OCUs that you need with persistent buffering with different sources and processors.
Splitting
You can configure an OpenSearch Ingestion pipeline to split incoming events into a sub-pipeline, allowing you to perform different types of processing on the same incoming event.
The following example pipeline splits incoming events into two sub-pipelines. Each sub-pipeline uses its own processor to enrich and manipulate the data, and then sends the data to different OpenSearch indexes.
version: "2" log-pipeline: source: http: ... sink: - pipeline: name: "logs_enriched_one_pipeline" - pipeline: name: "logs_enriched_two_pipeline" logs_enriched_one_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_one_logs" logs_enriched_two_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_two_logs"
Chaining
You can chain multiple sub-pipelines together in order to perform data processing and enrichment in chunks. In other words, you can enrich an incoming event with certain processing capabilities in one sub-pipeline, then send it to another sub-pipeline for additional enrichment with a different processor, and finally send it to its OpenSearch sink.
In the following example, the log_pipeline
sub-pipeline enriches an
incoming log event with a set of processors, then sends the event to an OpenSearch index
named enriched_logs
. The pipeline sends the same event to the
log_advanced_pipeline
sub-pipeline, which processes it and sends it to
a different OpenSearch index named enriched_advanced_logs
.
version: "2" log-pipeline: source: http: ... processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_logs" - pipeline: name: "log_advanced_pipeline" log_advanced_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_advanced_logs"
Dead-letter queues
Dead-letter queues (DLQs) are destinations for events that a pipeline fails to write to a sink. In OpenSearch Ingestion, you must specify a Amazon S3 bucket with appropriate write permissions to be used as the DLQ. You can add a DLQ configuration to every sink within a pipeline. When a pipeline encounters write errors, it creates DLQ objects in the configured S3 bucket. DLQ objects exist within a JSON file as an array of failed events.
A pipeline writes events to the DLQ when either of the following conditions are met:
-
The
max_retries
for the OpenSearch sink have been exhausted. OpenSearch Ingestion requires a minimum of 16 for this option. -
Events are rejected by the sink due to an error condition.
Configuration
To configure a dead-letter queue for a sub-pipeline, specify the dlq
option within the opensearch
sink configuration:
apache-log-pipeline: ... sink: opensearch: dlq: s3: bucket: "my-dlq-bucket" key_path_prefix: "dlq-files" region: "us-west-2" sts_role_arn: "arn:aws:iam::123456789012:role/dlq-role"
Files written to this S3 DLQ will have the following naming pattern:
dlq-v${version}-${pipelineName}-${pluginId}-${timestampIso8601}-${uniqueId}
For more information, see Dead-Letter
Queues (DLQ)
For instructions to configure the sts_role_arn
role, see Writing to a dead-letter queue.
Example
Consider the following example DLQ file:
dlq-v2-apache-log-pipeline-opensearch-2023-04-05T15:26:19.152938Z-e7eb675a-f558-4048-8566-dac15a4f8343
Here's an example of data that failed to be written to the sink, and is sent to the DLQ S3 bucket for further analysis:
Record_0 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "sample log" timestamp "2023-04-14T10:36:01.070Z" Record_1 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "another sample log" timestamp "2023-04-14T10:36:01.071Z"
Index management
Amazon OpenSearch Ingestion has many index management capabilities, including the following.
Creating indexes
You can specify an index name in a pipeline sink and OpenSearch Ingestion creates the index when it provisions the pipeline. If an index already exists, the pipeline uses it to index incoming events. If you stop and restart a pipeline, or if you update its YAML configuration, the pipeline attempts to create new indexes if they don't already exist. A pipeline can never delete an index.
The following example sinks create two indexes when the pipeline is provisioned:
sink: - opensearch: index: apache_logs - opensearch: index: nginx_logs
Generating index names and patterns
You can generate dynamic index names by using variables from the fields of
incoming events. In the sink configuration, use the format string${}
to
signal string interpolation, and use a JSON pointer to extract fields from events.
The options for index_type
are custom
or
management_disabled
. Because index_type
defaults to
custom
for OpenSearch domains and management_disabled
for OpenSearch Serverless collections, it can be left unset.
For example, the following pipeline selects the metadataType
field
from incoming events to generate index names.
pipeline: ... sink: opensearch: index: "metadata-${metadataType}"
The following configuration continues to generate a new index every day or every hour.
pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd}" pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd.HH}"
The index name can also be a plain string with a date-time pattern as a suffix,
such as my-index-%{yyyy.MM.dd}
. When the sink sends data to OpenSearch,
it replaces the date-time pattern with UTC time and creates a new index for each
day, such as my-index-2022.01.25
. For more information, see the DateTimeFormatter
This index name can also be a formatted string (with or without a date-time
pattern suffix), such as my-${index}-name
. When the sink sends data to
OpenSearch, it replaces the "${index}"
portion with the value in the
event being processed. If the format is "${index1/index2/index3}"
, it
replaces the field index1/index2/index3
with its value in the
event.
Generating document IDs
A pipeline can generate a document ID while indexing documents to OpenSearch. It can infer these document IDs from the fields within incoming events.
This example uses the uuid
field from an incoming event to generate a
document ID.
pipeline: ... sink: opensearch: index_type: custom index: "metadata-${metadataType}-%{yyyy.MM.dd}" document_id_field: "uuid"
In the following example, the Add entriesuuid
and
other_field
from the incoming event to generate a document
ID.
The create
action ensures that documents with identical IDs aren't
overwritten. The pipeline drops duplicate documents without any retry or DLQ event.
This is a reasonable expectation for pipeline authors who use this action, because
the goals is to avoid updating existing documents.
pipeline: ... processor: - add_entries: entries: - key: "my_doc_id_field" format: "${uuid}-${other_field}" sink: - opensearch: ... action: "create" document_id_field: "my_doc_id_field"
You might want to set an event's document ID to a field from a sub-object. In the
following example, the OpenSearch sink plugin uses the sub-object
info/id
to generate a document ID.
sink: - opensearch: ... document_id_field: info/id
Given the following event, the pipeline will generate a document with the
_id
field set to json001
:
{ "fieldA":"arbitrary value", "info":{ "id":"json001", "fieldA":"xyz", "fieldB":"def" } }
Generating routing IDs
You can use the routing_field
option within the OpenSearch sink plugin
to set the value of a document routing property (_routing
) to a value
from an incoming event.
Routing supports JSON pointer syntax, so nested fields also are available, not just top-level fields.
sink: - opensearch: ... routing_field: metadata/id document_id_field: id
Given the following event, the plugin generates a document with the
_routing
field set to abcd
:
{ "id":"123", "metadata":{ "id":"abcd", "fieldA":"valueA" }, "fieldB":"valueB" }
For instructions to create index templates that pipelines can use during index
creation, see Index
templates
End-to-end acknowledgement
OpenSearch Ingestion ensures the durability and reliability of data by tracking its
delivery from source to sinks in stateless pipelines using end-to-end
acknowledgement. Currently, only the S3 source
With end-to-end acknowledgement, the pipeline source plugin creates an acknowledgement set to monitor a batch of events. It receives a positive acknowledgement when those events are successfully sent to their sinks, or a negative acknowledgement when any of the events could not be sent to their sinks.
In the event of a failure or crash of a pipeline component, or if a source fails to receive an acknowledgement, the source times out and takes necessary actions such as retrying or logging the failure. If the pipeline has multiple sinks or multiple sub-pipelines configured, event-level acknowledgements are sent only after the event is sent to all sinks in all sub-pipelines. If a sink has a DLQ configured, end-to-end acknowledgements also tracks events written to the DLQ.
To enable end-to-end acknowledgement, include the acknowledgments
option
within the source configuration:
s3-pipeline: source: s3: acknowledgments: true ...
Source back pressure
A pipeline can experience back pressure when it's busy processing data, or if its sinks are temporarily down or slow to ingest data. OpenSearch Ingestion has different ways of handling back pressure depending on the source plugin that a pipeline is using.
HTTP source
Pipelines that use the HTTP source
-
Buffers – When buffers are full, the pipeline starts returning HTTP status
REQUEST_TIMEOUT
with error code 408 back to the source endpoint. As buffers are freed up, the pipeline starts processing HTTP events again. -
Source threads – When all HTTP source threads are busy executing requests and the unprocessed request queue size has exceeded the maximum allowed number of requests, the pipeline starts to return HTTP status
TOO_MANY_REQUESTS
with error code 429 back to the source endpoint. When the request queue drops below the maximum allowed queue size, the pipeline starts processing requests again.
OTel source
When buffers are full for pipelines that use OpenTelemetry sources (OTel logsREQUEST_TIMEOUT
with error code 408 to the source endpoint. As
buffers are freed up, the pipeline starts processing events again.
S3 source
When buffers are full for pipelines with an S3
If a sink is down or unable to ingest data and end-to-end acknowledgement is enabled for the source, the pipeline stops processing SQS notifications until it receives a successful acknowledgement from all sinks.