本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
在 Amaz OpenSearch on DynamoDB 上使用采集管道
您可以将 OpenSearch 采集管道与 DynamoDB 配合使用,将 DynamoDB 表事件(例如创建、更新和删除)流式传输到亚马逊服务域和集合。 OpenSearch OpenSearch Ingestion 管道整合了变更数据捕获 (CDC) 基础架构,以提供一种高规模、低延迟的方式来持续流式传输 DynamoDB 表中的数据。
您可以通过两种方式使用 DynamoDB 作为处理数据的来源:有或没有完整初始快照。
完整的初始快照是 DynamoDB 使用恢复 PITR () 功能拍摄的表point-in-time 的备份。DynamoDB 将此快照上传到 Amazon S3。从那里,In OpenSearch gestion 管道将其发送到域中的一个索引,或者将其分区到域中的多个索引。为了保持 DynamoDB 中的数据一致 OpenSearch 性,管道将 DynamoDB 表中的所有创建、更新和删除事件与保存在一个或多个索引中的文档同步。 OpenSearch
当您使用完整的初始快照时,您的 OpenSearch 摄取管道会首先提取快照,然后开始从 DynamoDB Streams 读取数据。它最终会赶上并保持 DynamoDB 和之间近乎实时的数据一致性。 OpenSearch选择此选项时,必须同时启用两者,PITR并在您的表上启用 DynamoDB 直播。
您也可以使用 OpenSearch Ingestion 与 DynamoDB 的集成在没有快照的情况下流式传输事件。如果您已经拥有来自其他机制的完整快照,或者您只想通过 DynamoDB Streams 从 DynamoDB 表中流式传输当前事件,请选择此选项。选择此选项时,只需要在表中启用 DynamoDB 流。
有关此集成的更多信息,请参阅开发人员指南中的 Amazon DynamoDB DynamoDB 与 OpenSearch 亚马逊服务的零ETL集成。
先决条件
要设置管道,您必须有一个已启用 DynamoDB Streams 的 DynamoDB 表。您的流应使用 NEW_IMAGE
流视图类型。但是,NEW_AND_OLD_IMAGES
如果这种流视图类型适合您的用例, OpenSearch Ingestion 管道也可以使用流式传输事件。
如果您使用的是快照,则还必须在表上启用 point-in-time 恢复。有关更多信息,请参阅 Amazon DynamoD B 开发者指南中的创建表、启用 point-in-time 恢复和启用流。
步骤 1:配置管道角色
设置 DynamoDB 表后,设置要在管道配置中使用的管道角色,并在该角色中添加以下 DynamoDB 权限:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "allowRunExportJob", "Effect": "Allow", "Action": [ "dynamodb:DescribeTable", "dynamodb:DescribeContinuousBackups", "dynamodb:ExportTableToPointInTime" ], "Resource": [ "arn:aws:dynamodb:
us-east-1
:{account-id}
:table/my-table
" ] }, { "Sid": "allowCheckExportjob", "Effect": "Allow", "Action": [ "dynamodb:DescribeExport" ], "Resource": [ "arn:aws:dynamodb:us-east-1
:{account-id}
:table/my-table
/export/*" ] }, { "Sid": "allowReadFromStream", "Effect": "Allow", "Action": [ "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator" ], "Resource": [ "arn:aws:dynamodb:us-east-1
:{account-id}
:table/my-table
/stream/*" ] }, { "Sid": "allowReadAndWriteToS3ForExport", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:AbortMultipartUpload", "s3:PutObject", "s3:PutObjectAcl" ], "Resource": [ "arn:aws:s3:::my-bucket
/{exportPath}
/*" ] } ] }
您也可以使用 AWS KMS 客户管理的密钥对导出数据文件进行加密。要解密导出的对象,请在管道的导出配置中指定 s3_sse_kms_key_id
作为密钥 ID,格式如下:arn:aws:kms:
。以下策略包括使用客户托管密钥所需的权限:us-west-2
:{account-id}
:key/my-key-id
{ "Sid": "allowUseOfCustomManagedKey", "Effect": "Allow", "Action": [ "kms:GenerateDataKey", "kms:Decrypt" ], "Resource":
arn:aws:kms:
}us-west-2
:{account-id}
:key/my-key-id
步骤 2:创建管道
然后,您可以配置如下所示的 OpenSearch 采集管道,将 DynamoDB 指定为来源。此示例管道从PITR快照中提取数据,然后从 table-a
DynamoDB Streams 提取事件。LATEST
的起始位置指示管道应从 DynamoDB Streams 读取最新数据。
version: "2" cdc-pipeline: source: dynamodb: tables: - table_arn: "arn:aws:dynamodb:
us-west-2
:{account-id}
:table/table-a
" export: s3_bucket: "my-bucket
" s3_prefix: "export/" stream: start_position: "LATEST" aws: region: "us-west-2" sts_role_arn: "arn:aws:iam::{account-id}
:role/pipeline-role" sink: - opensearch: hosts: ["https://search-mydomain.us-east-1.es.amazonaws.com
"] index: "${getMetadata(\"table_name
\")}" index_type: custom normalize_index: true document_id: "${getMetadata(\"primary_key\")}" action: "${getMetadata(\"opensearch_action\")}" document_version: "${getMetadata(\"document_version\")}" document_version_type: "external"
您可以使用预配置的 DynamoDB 蓝图来创建此管道。有关更多信息,请参阅 使用蓝图创建管道。
数据一致性
OpenSearch Ingestion 支持 end-to-end 确认,以确保数据的持久性。管道读取快照或流时,它会动态创建分区以进行并行处理。当管道在摄取 OpenSearch 域或集合中的所有记录后收到确认信息时,该管道会将该分区标记为已完成。
如果要收录到 OpenSearch 无服务器搜索集合中,可以在管道中生成文档 ID。如果要采集到 OpenSearch 无服务器时间序列集合中,请注意该管道不会生成文档 ID。
In OpenSearch gestion 管道还将传入的事件操作映射到相应的批量索引操作中,以帮助采集文档。这样可以保持数据的一致性,因此 DynamoDB 中的每个数据更改都与中的相应文档更改保持一致。 OpenSearch
映射数据类型
OpenSearch 服务将每个传入文档中的数据类型动态映射到 DynamoDB 中的相应数据类型。下表显示了 S OpenSearch ervice 如何自动映射各种数据类型。
数据类型 | OpenSearch | DynamoDB |
---|---|---|
数字 |
OpenSearch 自动映射数值数据。如果该数字是整数,则将其 OpenSearch 映射为长值。如果数字是小数,则将其 OpenSearch 映射为浮点值。 OpenSearch 根据第一个发送的文档动态映射各种属性。如果您在 DynamoDB 中为同一属性混合了多种数据类型(例如整数和小数),则映射可能会失败。 例如,如果您的第一个文档的属性为整数,而后来的文档具有与小数相同的属性, OpenSearch 则无法采集第二个文档。在这些情况下,应提供一个显式的映射模板,如下所示:
如果需要双精度,请使用字符串类型字段映射。不存在支持 38 位精度的等效数字类型 OpenSearch。 |
DynamoDB 支持数字。 |
数字集 | OpenSearch 自动将数字集映射到由长值或浮点值组成的数组中。与标量数字一样,这取决于摄取的第一个数字是整数还是小数。您可以像映射标量字符串一样提供数字集的映射。 |
DynamoDB 支持表示数字集的类型。 |
String |
OpenSearch 自动将字符串值映射为文本。在某些情况下(例如枚举值),您可以映射到关键字类型。 以下示例说明如何将
|
DynamoDB 支持字符串。 |
字符串集 |
OpenSearch 自动将字符串集映射到字符串数组中。您可以像映射标量字符串一样提供字符串集的映射。 |
DynamoDB 支持表示字符串集的类型。 |
二元 |
OpenSearch 自动将二进制数据映射为文本。您可以提供映射以将它们写成二进制字段 OpenSearch。 以下示例说明如何将
|
DynamoDB 支持二进制类型属性。 |
二进制集 |
OpenSearch 自动将二进制集作为文本映射到二进制数据数组中。您可以像映射标量二进制一样提供数字集的映射。 |
DynamoDB 支持表示二进制值集的类型。 |
布尔值 |
OpenSearch 将 DynamoDB 布尔类型映射为 OpenSearch 布尔类型。 |
DynamoDB 支持布尔类型属性。 |
Null |
OpenSearch 可以采集 DynamoDB 空类型的文档。它将该值作为空值保存在文档中。此类型没有映射,并且此字段未编制索引或不可搜索。 如果对空类型使用相同的属性名称,然后更改为其他类型(例如字符串),则会为第一个非空值 OpenSearch 创建动态映射。后续值仍然可以是 DynamoDB 空值。 |
DynamoDB 支持空类型属性。 |
Map |
OpenSearch 将 DynamoDB 映射属性映射到嵌套字段。嵌套字段内也适用相同的映射。 以下示例将嵌套字段中的字符串映射到中的关键字类型 OpenSearch:
|
DynamoDB 支持映射类型属性。 |
列出 |
OpenSearch 根据列表中的内容,为 DynamoDB 列表提供了不同的结果。 当列表包含所有相同类型的标量类型(例如,所有字符串的列表)时,则将该列表作为 OpenSearch 该类型的数组提取。这适用于字符串、数字、布尔值和空类型。其中每种类型的限制与该类型标量的限制相同。 您还可以使用与映射相同的映射,为映射列表提供映射。 您无法提供混合类型的列表。 |
DynamoDB 支持列表类型属性。 |
设置 |
OpenSearch 根据 DynamoDB 集合中的内容,为 DynamoDB 集提供不同的结果。 当一个集合包含所有相同类型的标量类型(例如,所有字符串的集合)时,则将该集合作为该类型的数组 OpenSearch 摄取。这适用于字符串、数字、布尔值和空类型。其中每种类型的限制与该类型标量的限制相同。 您还可以使用与映射相同的映射,为映射集合提供映射。 您无法提供混合类型的集合。 |
DynamoDB 支持表示集合的类型。 |
我们建议您在 OpenSearch 摄取管道中配置死信队列 (DLQ)。如果您已配置队列,S OpenSearch ervice 会将所有因动态映射失败而无法载入的失败文档发送到队列。
如果自动映射失败,则可以在管道配置中使用 template_type
和 template_content
来定义显式映射规则。或者,您可以在启动管道之前直接在搜索域或集合中创建映射模板。
限制
在为 DynamoDB 设置 OpenSearch 摄取管道时,请考虑以下限制:
-
OpenSearch 采集与 DynamoDB 的集成目前不支持跨区域接入。您的 DynamoDB 表 OpenSearch 和摄取管道必须相同。 AWS 区域
-
您的 DynamoDB 表 OpenSearch 和摄取管道必须相同。 AWS 账户
-
一个 OpenSearch 摄取管道仅支持一个 DynamoDB 表作为其来源。
-
DynamoDB Streams 仅在日志中存储最多 24 小时的数据。如果从大型表的初始快照中摄取数据需要 24 小时或更长时间,则会丢失一些初始数据。为了缓解这种数据丢失,请估计表的大小并配置适当的 OpenSearch 摄取管道计算单元。
DynamoDB 的推荐 CloudWatch 警报
建议使用以下 CloudWatch 指标来监控您的摄取管道的性能。这些指标可以帮助您确定从导出中处理的数据量、从流处理的事件数量、处理导出和流事件时的错误以及写入目标的文档数量。您可以设置 CloudWatch 警报,以便在其中一个指标在指定时间内超过指定值时执行操作。
指标 | 描述 |
---|---|
dynamodb-pipeline.BlockingBuffer.bufferUsage.value |
表示正在使用多少缓冲区。 |
dynamodb-pipeline.dynamodb.activeExportS3ObjectConsumers.value
|
显示正在积极处理待导出OCUs的 Amazon S3 对象的总数。 |
dynamodb-pipeline.dynamodb.bytesProcessed.count
|
从 DynamoDB 源处理的字节数。 |
dynamodb-pipeline.dynamodb.changeEventsProcessed.count
|
从 DynamoDB 流处理的更改事件数量。 |
dynamodb-pipeline.dynamodb.changeEventsProcessingErrors.count
|
从 DynamoDB 处理的更改事件所产生的错误数。 |
dynamodb-pipeline.dynamodb.exportJobFailure.count
|
失败的导出任务提交尝试次数。 |
dynamodb-pipeline.dynamodb.exportJobSuccess.count
|
已成功提交的导出任务数量。 |
dynamodb-pipeline.dynamodb.exportRecordsProcessed.count
|
导出后处理的记录总数。 |
dynamodb-pipeline.dynamodb.exportRecordsTotal.count
|
从 DynamoDB 导出的记录总数,这对于跟踪数据导出量至关重要。 |
dynamodb-pipeline.dynamodb.exportS3ObjectsProcessed.count
|
从 Amazon S3 成功处理的导出数据文件总数。 |
dynamodb-pipeline.opensearch.bulkBadRequestErrors.count
|
批量请求期间由于请求格式错误而导致的错误计数。 |
dynamodb-pipeline.opensearch.bulkRequestLatency.avg
|
向发出的批量写入请求的平均延迟 OpenSearch。 |
dynamodb-pipeline.opensearch.bulkRequestNotFoundErrors.count
|
由于找不到目标数据而失败的批量请求数。 |
dynamodb-pipeline.opensearch.bulkRequestNumberOfRetries.count
|
采集管道为写 OpenSearch 入集群而进行的重试次数。 OpenSearch |
dynamodb-pipeline.opensearch.bulkRequestSizeBytes.sum
|
向发出的所有批量请求的总大小(以字节为单位) OpenSearch。 |
dynamodb-pipeline.opensearch.documentErrors.count
|
向发送文档时出现的错误数 OpenSearch。导致错误的文件将发送至。DLQ |
dynamodb-pipeline.opensearch.documentsSuccess.count
|
成功写入 OpenSearch 集群或集合的文档数。 |
dynamodb-pipeline.opensearch.documentsSuccessFirstAttempt.count
|
第一次尝试成功编入索引 OpenSearch 的文档数。 |
|
处理过程中由于文档版本冲突而导致的错误计数。 |
|
OpenSearch Ingestion 管道通过从源读取数据到写入目标来处理数据的平均延迟。 |
dynamodb-pipeline.opensearch.PipelineLatency.max
|
通过从源读取数据到写入目标来处理数据的 OpenSearch Ingestion 管道的最大延迟。 |
dynamodb-pipeline.opensearch.recordsIn.count
|
成功摄入 OpenSearch的记录数。该指标对于跟踪正在处理和存储的数据量至关重要。 |
dynamodb-pipeline.opensearch.s3.dlqS3RecordsFailed.count
|
写入失败的记录数DLQ。 |
dynamodb-pipeline.opensearch.s3.dlqS3RecordsSuccess.count
|
写入的记录数DLQ。 |
dynamodb-pipeline.opensearch.s3.dlqS3RequestLatency.count
|
Amazon S3 死信队列请求的延迟测量次数。 |
dynamodb-pipeline.opensearch.s3.dlqS3RequestLatency.sum
|
Amazon S3 死信队列的所有请求的总延迟 |
dynamodb-pipeline.opensearch.s3.dlqS3RequestSizeBytes.sum
|
向 Amazon S3 死信队列发出的所有请求的总大小(以字节为单位)。 |
dynamodb-pipeline.recordsProcessed.count
|
管道中处理的记录总数,这是衡量整体吞吐量的关键指标。 |
dynamodb.changeEventsProcessed.count
|
没有从 DynamoDB 流收集任何记录。这可能是由于桌上没有活动、正在进行导出或访问 DynamoDB 直播时出现问题。 |
|
尝试触发向 S3 的导出操作失败。 |
|
OpenSearch 由于输入无效而导致的批量请求错误计数,这对于监控数据质量和操作问题至关重要。 |
opensearch.EndToEndLatency.avg
|
端到端延迟高于从 DynamoDB 数据流读取所需的延迟。这可能是由于 OpenSearch 集群规模不足,或者最大管道OCU容量太低,无法满足 DynamoDB WCU 表的吞吐量。这种端到端的延迟在导出后会很高,而且随着时间的推移,它会赶上最新的 DynamoDB 流。 |