开始使用 Amazon Managed Streaming for Apache Kafka(Amazon MSK)流式摄取 - Amazon Redshift

开始使用 Amazon Managed Streaming for Apache Kafka(Amazon MSK)流式摄取

本主题介绍如何通过实体化视图使用来自 Amazon MSK 的流数据。

Amazon Redshift 流式摄取的目的是简化将流式数据直接从流式服务摄取到 Amazon Redshift 或 Amazon Redshift Serverless 的过程。这适用于预置 Amazon MSK 和 Amazon MSK Serverless 以及 Kinesis Data Streams。使用 Amazon Redshift 流式摄取时,在将流式数据摄取到 Redshift 之前,无需在 Amazon S3 中暂存 Kinesis Data Streams 流或 Amazon MSK 主题。

在技术层面上,来自 Amazon Kinesis Data Streams 和 Amazon MSK 的流式摄取以低延迟、高速度的方式将流式或主题数据摄取到 Amazon Redshift 实体化视图中。设置完成后,使用实体化视图刷新,可以接收大量数据。

通过执行以下步骤,为 Amazon MSK 设置 Amazon Redshift 流式摄取:

  1. 创建映射到流式数据来源的外部 Schema。

  2. 创建引用外部 Schema 的实体化视图。

在配置 Amazon Redshift 流式摄取之前,您必须有可用的 Amazon MSK 源。如果您没有源,请按照开始使用 Amazon MSK 中的说明进行操作。

注意

流式摄取和 Amazon Redshift Serverless – 本主题中的配置步骤同时适用于预调配的 Amazon Redshift 集群和 Amazon Redshift Serverless。有关更多信息,请参阅 流式摄取行为和数据类型

设置 IAM 权限并从 Kafka 执行流式摄取

假设您有可用的 Amazon MSK 集群,第一步是使用 CREATE EXTERNAL SCHEMA 在 Redshift 中定义一个架构,并引用 Amazon MSK 集群作为数据来源。之后,要访问主题中的数据,请在实体化视图中定义 STREAM。您可以使用默认的 Amazon Redshift VARBYTE 数据类型存储主题中的记录,也可以定义架构来将数据转换为半结构化 SUPER 格式。当您查询实体化视图时,返回的记录是主题的时间点视图。

  1. 如果您使用 AUTHENTICATION NONE 连接到 MSK,则不需要任何 IAM 角色。但是,如果使用 AUTHENTICATION IAM 或 MTLS 向 Amazon MSK 集群进行身份验证,则 Amazon Redshift 集群或 Amazon Redshift Serverless 命名空间必须附加具有适当权限的 IAM 角色。使用允许 Amazon Redshift 集群或 Amazon Redshift Serverless 命名空间代入 IAM 角色的信任策略创建该角色。创建角色后,添加以下权限之一以支持 IAM 或 MTLS。对于 mTLS 身份验证,Amazon Redshift 使用的证书可以存储在 AWS Certificate Manager 或 AWS Secrets Manager 中,因此必须选择与证书存储位置相匹配的策略。将角色附加到 Amazon Redshift 预置集群或 Redshift Serverless 命名空间。有关如何为 IAM 角色配置信任策略的信息,请参阅授权 Amazon Redshift 代表您访问其他 AWS 服务

    AUTHENTICATION IAM:

    { "Version": "2012-10-17", "Statement": [ { "Sid": "MSKIAMpolicy", "Effect": "Allow", "Action": [ "kafka-cluster:ReadData", "kafka-cluster:DescribeTopic", "kafka-cluster:Connect" ], "Resource": [ "arn:aws:kafka:*:0123456789:cluster/MyTestCluster/*", "arn:aws:kafka:*:0123456789:topic/MyTestCluster/*" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup" ], "Resource": [ "arn:aws:kafka:*:0123456789:group/MyTestCluster/*" ] } ] }

    AUTHENTICATION MTLS:使用存储在 AWS Certificate Manager 中的证书

    { "Version": "2012-10-17", "Statement": [ { "Sid": "MSKmTLSACMpolicy", "Effect": "Allow", "Action": [ "acm:ExportCertificate" ], "Resource": [ "arn:aws:acm:us-east-1:444455556666:certificate/certificate_ID" ] } ] }

    AUTHENTICATION MTLS:使用存储在 AWS Secrets Manager 中的证书

    { "Version": "2012-10-17", "Statement": [ { "Sid": "MSKmTLSSecretsManagerpolicy", "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue" ], "Resource": [ "arn:aws:secretsmanager:us-east-1:444455556666:secret:secret_ID" ] } ] }
  2. 检查您的 VPC,并确认您的 Amazon Redshift 集群或 Amazon Redshift Serverless 拥有通往 Amazon MSK 集群的路由。Amazon MSK 集群的入站安全组规则应允许 Amazon Redshift 集群或 Amazon Redshift Serverless 工作组的安全组。指定的端口取决于 Amazon MSK 集群上配置的身份验证方法。有关更多信息,请参阅端口信息从 AWS 内但在 VPC 外部访问

    下表显示了为从 Amazon MSK 进行流式摄取所要设置的免费配置选项:

    Amazon Redshift 配置 Amazon MSK 配置 要在 Redshift 和 Amazon MSK 之间打开的端口
    AUTHENTICATION NONE TLS 传输已禁用 9092
    AUTHENTICATION NONE TLS 传输已启用 9094
    AUTHENTICATION IAM IAM 9098/9198
    AUTHENTICATION MTLS TLS 传输已启用 9094

    Amazon Redshift 身份验证是在 CREATE EXTERNAL SCHEMA 语句中设置的。

    注意

    如果 Amazon MSK 集群启用了相互传输层安全性协议 (mTLS) 身份验证,则将 Amazon Redshift 配置为使用 AUTHENTICATION NONE 会指示它使用端口 9094 进行未经身份验证的访问。但是,由于 mTLS 身份验证正在使用该端口,因此这一过程将失败。因此,我们建议您在使用 mTLS 时切换到 AUTHENTICATION mtls。

  3. 在 Amazon Redshift 集群或 Amazon Redshift Serverless 工作组中启用增强型 VPC 路由。有关更多信息,请参阅启用增强型 VPC 路由

  4. 在 Amazon Redshift 中,创建一个外部 Schema 以映射到 Amazon MSK 集群。该语法如下所示:

    CREATE EXTERNAL SCHEMA MySchema FROM MSK [ IAM_ROLE [ default | 'iam-role-arn' ] ] AUTHENTICATION [ none | iam | mtls ] [AUTHENTICATION_ARN 'acm-certificate-arn' | SECRET_ARN 'ssm-secret-arn' ];

    FROM 子句中,MSK 表示架构映射来自托管式 Kafka 服务的数据。

    AUTHENTICATION 表示使用 Amazon MSK 进行流式摄取的身份验证类型。有三种类型:

    • – 指定不需要身份验证。这相当于 MSK 上的未经身份验证访问。

    • iam – 指定 IAM 身份验证。选择此选项时,请确保 IAM 角色具有 IAM 身份验证的权限。有关设置所需 IAM 策略的更多信息,请参阅 设置 IAM 权限并从 Kafka 执行流式摄取。

    • mtls – 指定双向传输层安全通过促进客户端和服务器之间的身份验证来提供安全通信。在本例中,客户端是 Redshift,服务器是 Amazon MSK。有关使用 mTLS 配置流式摄取的更多信息,请参阅 使用 mTLS 对来自 Amazon MSK 的 Redshift 流式摄取进行身份验证

    请注意,流式摄取不支持使用用户名和密码的 Amazon MSK 身份验证。

    AUTHENTICATION_ARN 指定用于建立加密连接的 ACM 双向传输层安全(mTLS)证书的 ARN。

    SECRET_ARN 指定 AWS Secrets Manager 密钥的 ARN,其中包含 Amazon Redshift 用于 mTLS 的证书。

    以下示例展示了如何在创建外部架构时为 Amazon MSK 集群设置代理 URI:

    CREATE EXTERNAL SCHEMA my_schema FROM MSK IAM_ROLE 'arn:aws:iam::012345678901:role/my_role' AUTHENTICATION IAM URI 'b-1.myTestCluster.123z8u.c2.kafka.us-west-1.amazonaws.com:9098,b-2.myTestCluster.123z8u.c2.kafka.us-west-1.amazonaws.com:9098'

    不使用身份验证:

    CREATE EXTERNAL SCHEMA my_schema FROM MSK AUTHENTICATION none URI 'b-1.myTestCluster.123z8u.c2.kafka.us-west-1.amazonaws.com:9092,b-2.myTestCluster.123z8u.c2.kafka.us-west-1.amazonaws.com:9092'

    使用 mTLS

    CREATE EXTERNAL SCHEMA my_schema FROM MSK IAM_ROLE 'arn:aws:iam::012345678901:role/my_role' AUTHENTICATION MTLS URI 'b-1.myTestCluster.123z8u.c2.kafka.us-west-1.amazonaws.com:9094,b- 2.myTestCluster.123z8u.c2.kafka.us-west-1.amazonaws.com:9094' AUTHENTICATION_ARN 'acm-certificate-arn' | [ SECRET_ARN 'ssm-secret-arn' ];

    有关创建外部架构的更多信息,请参阅 CREATE EXTERNAL SCHEMA

  5. 创建一个实体化视图以使用来自主题的数据。使用 SQL 命令,如以下示例。

    CREATE MATERIALIZED VIEW MyView AUTO REFRESH YES AS SELECT * FROM MySchema."mytopic";

    Kafka 主题名称区分大小写,可以包含大写字母和小写字母。要从名称大写的主题摄取内容,可以在会话或数据库级别将配置 enable_case_sensitive_identifier 设置为 true。有关更多信息,请参阅名称和标识符enable_case_sensitive_identifier

    要开启自动刷新,请使用 AUTO REFRESH YES。默认行为是手动刷新。

    元数据列包括以下内容:

    元数据列 数据类型 描述
    kafka_partition bigint 来自 Kafka 主题的记录的分区 ID
    kafka_offset bigint Kafka 主题中给定分区的记录的偏移
    kafka_timestamp_type char(1)

    Kafka 记录中使用的时间戳类型:

    • C – 客户端的记录创建时间 (CREATE_TIME)

    • L – Kafka 服务器端的记录追加时间 (LOG_APPEND_TIME)

    • U – 记录创建时间不可用 (NO_TIMESTAMP_TYPE)

    kafka_timestamp 不带时区的时间戳 记录的时间戳值
    kafka_key varbyte Kafka 记录的键
    kafka_value varbyte 从 Kafka 收到的记录
    kafka_headers super 从 Kafka 收到的记录的标头
    refresh_time 不带时区的时间戳 刷新开始的时间

    需要注意的是,如果实体化视图定义中的业务逻辑导致业务逻辑错误,在某些情况下可能会导致流式摄取中的摄取失败。这可能会导致您不得不删除实体化视图,然后重新创建。为避免这种情况,我们建议您尽可能简化业务逻辑,并在摄取数据后对数据运行额外的逻辑。

  6. 刷新视图,这会调用 Amazon Redshift 从主题中读取数据并将数据加载到实体化视图中。

    REFRESH MATERIALIZED VIEW MyView;
  7. 在实体化视图中查询数据。

    select * from MyView;

    REFRESH 运行时,直接从主题更新实体化视图。您创建映射到 Kafka 主题数据来源的实体化视图。在实体化视图定义中,您可以对数据执行筛选和聚合。流式摄取实体化视图(基本实体化视图)只能引用一个 Kafka 主题,但是您可以创建额外的实体化视图,以与基本实体化视图和其他实体化视图或表连接使用。

有关流式摄取限制的更多信息,请参阅 流式摄取行为和数据类型