Amazon Athena MSK 连接器
使用适用于 Amazon MSK
此连接器不使用 Glue 连接将配置属性集中保存到 Glue 中。连接配置通过 Lambda 完成。
先决条件
可以使用 Athena 控制台或 AWS Serverless Application Repository 将该连接器部署到您的 AWS 账户。有关更多信息,请参阅创建数据来源连接或使用 AWS Serverless Application Repository 部署数据来源连接器。
限制
-
不支持写入 DDL 操作。
-
任何相关的 Lambda 限制。有关更多信息,请参阅《AWS Lambda 开发人员指南》中的 Lambda 配额。
-
必须将筛选条件中的日期和时间戳数据类型转换为适当的数据类型。
-
CSV 文件类型不支持日期和时间戳数据类型,它们被视为 VARCHAR 值。
-
不支持映射到嵌套 JSON 字段。连接器仅映射顶级字段。
-
连接器不支持复杂类型。复杂类型解释为字符串。
-
要提取或处理复杂的 JSON 值,请使用 Athena 中可用的 JSON 相关函数。有关更多信息,请参阅 从字符串中提取 JSON 数据。
-
连接器不支持对 Kafka 消息元数据的访问。
术语
-
元数据处理程序 — 从您的数据库实例中检索元数据的 Lambda 处理程序。
-
记录处理程序 — 从您的数据库实例中检索数据记录的 Lambda 处理程序。
-
复合处理程序 — 从您的数据库实例中检索元数据和数据记录的 Lambda 处理程序。
-
Kafka 端点 – 文本字符串,用于建立与 Kafka 实例的连接。
集群兼容性
MSK 连接器可用于以下集群类型。
-
MSK 预置集群 – 您可以手动指定、监控和扩展集群容量。
-
MSK 无服务器集群 – 提供随应用程序 I/O 扩展而自动扩展的按需容量。
-
独立 Kafka - 与 Kafka 直接连接(经过或未经身份验证)。
支持的身份验证方法
连接器支持以下身份验证方法。
-
SASL/PLAIN
-
SASL/PLAINTEXT
-
NO_AUTH
有关更多信息,请参阅 为 Athena MSK 连接器配置身份验证。
支持的输入数据格式
连接器支持以下输入数据格式。
-
JSON
-
CSV
参数
使用本节中的参数来配置 Athena MSK 连接器。
-
auth_type – 指定集群的身份验证类型。连接器支持以下身份验证类型:
-
NO_AUTH – 无需身份验证即可直接连接到 Kafka(例如,连接到部署在 EC2 实例上的 Kafka 集群,但不使用身份验证)。
-
SASL_SSL_PLAIN – 此方法使用
SASL_SSL
安全协议和PLAIN
SASL 机制。 -
SASL_PLAINTEXT_PLAIN – 此方法使用
SASL_PLAINTEXT
安全协议和PLAIN
SASL 机制。注意
Apache Kafka 支持
SASL_SSL_PLAIN
和SASL_PLAINTEXT_PLAIN
身份验证类型,但 Amazon MSK 不支持。 -
SASL_SSL_AWS_MSK_IAM – Amazon MSK 的 IAM 访问控制使您能够处理 MSK 集群的身份验证和授权。您用户的 AWS 凭证(密钥和访问密钥)用于与集群连接。有关更多信息,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的 IAM access control(IAM 访问控制)。
-
SASL_SSL_SCRAM_SHA512 – 您可以使用这种身份验证类型来控制对 Amazon MSK 集群的访问权限。此方法将用户名和密码存储在 AWS Secrets Manager 上。密钥必须与 Amazon MSK 集群相关。有关更多信息,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的为 Amazon MSK 集群设置 SASL/SCRAM 身份验证。
-
SSL – SSL 身份验证使用密钥存储和信任存储文件来连接 Amazon MSK 集群。您必须生成信任存储和密钥存储文件,将其上传到 Amazon S3 存储桶,并在部署连接器时提供对 Amazon S3 的引用。密钥存储、信任存储和 SSL 密钥存储在 AWS Secrets Manager 中。部署连接器时需要提供 AWS 私有密钥。有关更多信息,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的 Mutual TLS authentication(双向 TLS 身份验证)。
有关更多信息,请参阅 为 Athena MSK 连接器配置身份验证。
-
-
certificates_s3_reference – 包含证书(密钥存储和信任存储文件)的 Amazon S3 位置。
-
disable_spill_encryption -(可选)当设置为
True
时,将禁用溢出加密。默认值为False
,此时将使用 AES-GCM 对溢出到 S3 的数据使用进行加密 - 使用随机生成的密钥,或者使用 KMS 生成密钥。禁用溢出加密可以提高性能,尤其是当您的溢出位置使用服务器端加密时。 -
kafka_endpoint – 提供给 Kafka 的端点详细信息。例如,对于 Amazon MSK 集群,您可以为该集群提供引导 URL。
-
secrets_manager_secret – 保存凭证的 AWS 密钥名称。IAM 身份验证不需要此参数。
-
溢出参数 – Lambda 函数将不适合内存的数据临时存储(“溢出”)到 Amazon S3。由同一 Lambda 函数访问的所有数据库实例都会溢出到同一位置。使用下表中的参数指定溢出位置。
参数 描述 spill_bucket
必需。Amazon S3 存储桶的名称,Lambda 函数可以在该存储桶中溢出数据。 spill_prefix
必需。溢出存储桶中的前缀,Lambda 函数可以在该存储桶中溢出数据。 spill_put_request_headers
(可选)用于溢出的 Amazon S3 putObject
请求的请求标头和值的 JSON 编码映射(例如,{"x-amz-server-side-encryption" : "AES256"}
)。有关其他可能的标头,请参阅《Amazon Simple Storage Service API 参考》中的 PutObject。
数据类型支持
下表显示了 Kafka 和 Apache Arrow 支持的相应数据类型。
Kafka | Arrow |
---|---|
CHAR | VARCHAR |
VARCHAR | VARCHAR |
TIMESTAMP | MILLISECOND |
DATE | DAY |
BOOLEAN | BOOL |
SMALLINT | SMALLINT |
INTEGER | INT |
BIGINT | BIGINT |
DECIMAL | FLOAT8 |
DOUBLE | FLOAT8 |
分区和拆分
Kafka 主题分为多个分区。每个分区会进行排序。分区中的每条消息都有一个增量 ID,称为偏移。每个 Kafka 分区进一步分为多个分区,以进行并行处理。数据在 Kafka 集群中配置的保留期内可用。
最佳实践
最佳做法是在查询 Athena 时使用谓词下推,如下例所示。
SELECT * FROM "
msk_catalog_name
"."glue_schema_registry_name
"."glue_schema_name
" WHERE integercol = 2147483647
SELECT * FROM "
msk_catalog_name
"."glue_schema_registry_name
"."glue_schema_name
" WHERE timestampcol >= TIMESTAMP '2018-03-25 07:30:58.878'
设置 MSK 连接器
在使用连接器之前,您必须设置 Amazon MSK 集群,使用 AWS Glue 架构注册表定义架构,并为连接器配置身份验证。
注意
如果您将连接器部署到 VPC 中以访问私有资源,并且还想连接到 Confluent 等可公开访问的服务,则必须将连接器关联到某个具有 NAT 网关的私有子网。有关更多信息,请参阅《Amazon VPC 用户指南》中的 NAT 网关。
使用 AWS Glue 架构注册表时,请注意以下几点:
-
确保 AWS Glue 架构注册表的 Description(描述)字段中的文本包含
{AthenaFederationMSK}
字符串。此标记字符串是与 Amazon Athena MSK 连接器一起使用的 AWS Glue 注册表中的必填字段。 -
为了获得最佳性能,数据库名称和表名称仅限使用小写。使用混合大小写会使连接器执行不区分大小写的搜索,这种搜索的计算密集度更高。
设置 Amazon MSK 环境和 AWS Glue 架构注册表
-
设置 Amazon MSK 环境。有关信息和步骤,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的 Setting up Amazon MSK(设置 Amazon MSK)和 Getting started using Amazon MSK(开始使用 Amazon MSK)。
-
将 JSON 格式的 Kafka 主题描述文件(即其架构)上传到 AWS Glue 架构注册表。有关更多信息,请参阅《AWS Glue 开发人员指南》中的与 AWS Glue 架构注册表集成。有关详细示例,请参阅以下章节。
将架构上传到 AWS Glue 架构注册表时,请使用本节中的示例格式。
JSON 类型架构示例
在以下示例中,要在 AWS Glue 架构注册表中创建的架构指定 json
作为 dataFormat
的值,并将 datatypejson
用于 topicName
。
注意
topicName
的值应使用与 Kafka 中主题名称相同的大小写。
{ "topicName": "datatypejson", "message": { "dataFormat": "json", "fields": [ { "name": "intcol", "mapping": "intcol", "type": "INTEGER" }, { "name": "varcharcol", "mapping": "varcharcol", "type": "VARCHAR" }, { "name": "booleancol", "mapping": "booleancol", "type": "BOOLEAN" }, { "name": "bigintcol", "mapping": "bigintcol", "type": "BIGINT" }, { "name": "doublecol", "mapping": "doublecol", "type": "DOUBLE" }, { "name": "smallintcol", "mapping": "smallintcol", "type": "SMALLINT" }, { "name": "tinyintcol", "mapping": "tinyintcol", "type": "TINYINT" }, { "name": "datecol", "mapping": "datecol", "type": "DATE", "formatHint": "yyyy-MM-dd" }, { "name": "timestampcol", "mapping": "timestampcol", "type": "TIMESTAMP", "formatHint": "yyyy-MM-dd HH:mm:ss.SSS" } ] } }
CSV 类型架构示例
在以下示例中,要在 AWS Glue 架构注册表中创建的架构指定 csv
作为 dataFormat
的值,并将 datatypecsvbulk
用于 topicName
。topicName
的值应使用与 Kafka 中主题名称相同的大小写。
{ "topicName": "datatypecsvbulk", "message": { "dataFormat": "csv", "fields": [ { "name": "intcol", "type": "INTEGER", "mapping": "0" }, { "name": "varcharcol", "type": "VARCHAR", "mapping": "1" }, { "name": "booleancol", "type": "BOOLEAN", "mapping": "2" }, { "name": "bigintcol", "type": "BIGINT", "mapping": "3" }, { "name": "doublecol", "type": "DOUBLE", "mapping": "4" }, { "name": "smallintcol", "type": "SMALLINT", "mapping": "5" }, { "name": "tinyintcol", "type": "TINYINT", "mapping": "6" }, { "name": "floatcol", "type": "DOUBLE", "mapping": "7" } ] } }
为 Athena MSK 连接器配置身份验证
您可以使用多种方法对 Amazon MSK 集群进行身份验证,包括 IAM、SSL、SCRAM 和独立的 Kafka。
下表显示了连接器的身份验证类型以及每种连接器的安全协议和 SASL 机制。有关更多信息,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的 Authentication and authorization for Apache Kafka APIs(Apache Kafka API 的身份验证和授权)。
auth_type | security.protocol | sasl.mechanism |
---|---|---|
SASL_SSL_PLAIN |
SASL_SSL |
PLAIN |
SASL_PLAINTEXT_PLAIN |
SASL_PLAINTEXT |
PLAIN |
SASL_SSL_AWS_MSK_IAM |
SASL_SSL |
AWS_MSK_IAM |
SASL_SSL_SCRAM_SHA512 |
SASL_SSL |
SCRAM-SHA-512 |
SSL |
SSL |
不适用 |
注意
Apache Kafka 支持 SASL_SSL_PLAIN
和 SASL_PLAINTEXT_PLAIN
身份验证类型,但 Amazon MSK 不支持。
SASL/IAM
如果集群使用 IAM 身份验证,则在设置集群时必须为用户配置 IAM policy。有关更多信息,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的 IAM access control(IAM 访问控制)。
要使用此身份验证类型,请将连接器的 auth_type
Lambda 环境变量设置为 SASL_SSL_AWS_MSK_IAM
。
SSL
如果集群经过 SSL 身份验证,则您必须生成信任存储和密钥存储文件,并将其上传到 Amazon S3 存储桶。部署连接器时,必须提供此 Amazon S3 参考资料。密钥存储、信任存储和 SSL 密钥存储在 AWS Secrets Manager 中。部署连接器时需要提供 AWS 密钥。
有关在 Secrets Manager 中创建密钥的信息,请参阅创建 AWS Secrets Manager 密钥。
要使用此身份验证类型,请按下表所示设置环境变量。
参数 | 值 |
---|---|
auth_type |
SSL |
certificates_s3_reference |
包含证书的 Amazon S3 位置。 |
secrets_manager_secret |
您的 AWS 密钥名称。 |
在 Secrets Manager 中创建密钥后,您可以在 Secrets Manager 控制台中查看该密钥。
在 Secrets Manager 中查看密钥
打开 Secrets Manager 控制台,网址为 https://console.aws.amazon.com/secretsmanager/
。 -
在导航窗格中,选择 Secrets(密钥)。
-
在 Secrets(密钥)页面,选择密钥链接。
-
在密钥的详细信息页面上,选择 Retrieve secret value(检索密钥值)。
下图显示了示例密钥,其中包含三个键值对:
keystore_password
、truststore_password
和ssl_key_password
。
SASL/SCRAM
如果您的集群使用 SCRAM 身份验证,请在部署连接器时提供与集群关联的 Secrets Manager 密钥。用户的 AWS 凭证(密钥和访问密钥)用于与集群进行身份验证。
按下表所示设置环境变量。
参数 | 值 |
---|---|
auth_type |
SASL_SSL_SCRAM_SHA512 |
secrets_manager_secret |
您的 AWS 密钥名称。 |
下图显示了 Secrets Manager 控制台中的示例密钥,其中包含两个键值对:一个用于 username
,另一个用于 password
。
许可证信息
使用此连接器,即表示您确认包含第三方组件(这些组件的列表可在此连接器的 pom.xml
其他资源
有关此连接器的更多信息,请访问 GitHub.com 上的相应站点