Kinesis 连接 - AWS Glue

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Kinesis 连接

您可以使用 Kinesis 连接来对 Amazon Kinesis Data Streams 执行读取和写入操作,方法是通过存储在 Data Catalog 表中的信息进行读取和写入,或通过提供信息直接访问数据流。您可以从 Kinesis 读取信息到 Spark DataFrame 中,然后将其转换为 AWS Glue DynamicFrame。您可以用 JSON 格式将 DynamicFrames 写入 Kinesis。如果直接访问数据流,请使用这些选项提供有关如何访问数据流的信息。

如果您使用 getCatalogSourcecreate_data_frame_from_catalog 使用来自 Kinesis 流式处理源的记录,则任务具有数据目录数据库和表名称信息,将其用于获取一些基本参数,以便从 Kinesis 流式处理源读取数据。如果使用 getSourcegetSourceWithFormatcreateDataFrameFromOptionscreate_data_frame_from_options,则必须使用此处描述的连接选项指定这些基本参数。

您可以使用 GlueContext 类中指定方法的以下参数为 Kinesis 指定连接选项。

  • Scala

    • connectionOptions:与 getSourcecreateDataFrameFromOptionsgetSink 结合使用

    • additionalOptions:与 getCatalogSourcegetCatalogSink 结合使用

    • options:与 getSourceWithFormatgetSinkWithFormat 结合使用

  • Python

    • connection_options:与 create_data_frame_from_optionswrite_dynamic_frame_from_options 结合使用

    • additional_options:与 create_data_frame_from_catalogwrite_dynamic_frame_from_catalog 结合使用

    • options:与 getSourcegetSink 结合使用

有关流式处理 ETL 作业的注意事项和限制,请参阅 串流 ETL 注释和限制

配置 Kinesis

要在 AWS Glue Spark 作业中连接到 Kinesis 数据流,需要具备一些先决条件:

  • 如果读取,AWS Glue 作业必须拥有对 Kinesis 数据流的读取访问权限级别 IAM 权限。

  • 如果写入,AWS Glue 作业必须拥有对 Kinesis 数据流的写入访问权限级别 IAM 权限。

在某些情况下,您需要配置其他先决条件:

  • 如果您的 AWS Glue 作业配置了其他网络连接(通常用于连接到其他数据集),并且其中一个连接提供 Amazon VPC 网络选项,则这将引导您的作业通过 Amazon VPC 进行通信。在这种情况下,您还需要将 Kinesis 数据流配置为通过 Amazon VPC 进行通信。您可以通过在 Amazon VPC 和 Kinesis 数据流之间创建接口 VPC 端点实现此目的。有关更多信息,请参阅 Using Kinesis Data Streams with Interface VPC Endpoints

  • 在另一个账户中指定 Amazon Kinesis Data Streams 时,您必须设置角色和策略,从而允许跨账户访问。有关更多信息,请参阅示例:从不同账户的 Kinesis 串流中读取

有关流式处理 ETL 作业先决条件的更多信息,请参阅 在 AWS Glue 中流式处理 ETL 作业

示例:从 Kinesis 流读取

示例:从 Kinesis 流读取

forEachBatch 结合使用。

Amazon Kinesis 流式处理源示例:

kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)

示例:写入 Kinesis 流

示例:从 Kinesis 流读取

forEachBatch 结合使用。

Amazon Kinesis 流式处理源示例:

kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)

Kinesis 连接选项参考

为 Amazon Kinesis Data Streams 指定连接选项。

为 Kinesis 流式处理数据源使用以下连接选项:

  • "streamARN"(必填)用于读/写。Kinesis 数据流的 ARN。

  • "classification"(读取所必填)用于读取。记录中数据使用的文件格式。除非 Data Catalog 提供,否则为必需。

  • "streamName" –(可选)用于读取。要从中读取的 Kinesis 数据流的名称。与 endpointUrl 一起使用。

  • "endpointUrl" –(可选)用于读取。默认:“https://kinesis.us-east-1.amazonaws.com”。Kinesis 流的 AWS 端点。除非要连接到特殊区域,否则您无需对此进行更改。

  • "partitionKey" –(可选)用于写入。生成记录时所使用的 Kinesis 分区键。

  • "delimiter"(可选)用于读取。当 classification 为 CSV 时使用的值分隔符。默认为“,”。

  • "startingPosition":(可选)用于读取。要从中读取数据的 Kinesis 数据流中的起始位置。可能的值是 "latest""trim_horizon""earliest" 或以模式 yyyy-mm-ddTHH:MM:SSZ 采用 UTC 格式的时间戳字符串(其中 Z 表示带有 +/-的 UTC 时区偏移量。例如:“2023-04-04T08:00:00-04:00”)。默认值为 "latest"。注意:只有 AWS Glue 版本 4.0 或更高版本支持 "startingPosition" 的 UTC 格式中的时间戳字符串。

  • "failOnDataLoss":(可选)如果有任何活动分片丢失或已过期,则作业失败。默认值为 "false"

  • "awsSTSRoleARN":(可选)用于读取/写入。要使用 AWS Security Token Service(AWS STS)代入的角色的 Amazon 资源名称(ARN)。此角色必须拥有针对 Kinesis 数据流执行描述或读取记录操作的权限。在访问其他账户中的数据流时,必须使用此参数。与 "awsSTSSessionName" 结合使用。

  • "awsSTSSessionName":(可选)用于读取/写入。使用 AWS STS 代入角色的会话的标识符。在访问其他账户中的数据流时,必须使用此参数。与 "awsSTSRoleARN" 结合使用。

  • "awsSTSEndpoint":(可选)使用代入角色连接到 Kinesis 时要使用的 AWS STS 端点。这允许在 VPC 中使用区域 AWS STS 端点,而使用默认的全局端点是不可能的。

  • "maxFetchTimeInMs":(可选)用于读取。作业执行程序从 Kinesis 数据流中读取当前批处理记录所花费的最长时间,以毫秒为单位指定。在这段时间内可以进行多次 GetRecords API 调用。默认值为 1000

  • "maxFetchRecordsPerShard":(可选)用于读取。每个微批次将从 Kinesis 数据流中的每个分片获取的最大记录数。注意:如果流式传输作业已经从 Kinesis 读取了额外的记录(在同一个 get-records 调用中),则客户端可以超过此限制。如果 maxFetchRecordsPerShard 需要严格,则必须是 maxRecordPerRead 的整数倍。默认值为 100000

  • "maxRecordPerRead":(可选)用于读取。每项 getRecords 操作中要从 Kinesis 数据流获取的最大记录数。默认值为 10000

  • "addIdleTimeBetweenReads":(可选)用于读取。在两项连续 getRecords 操作之间添加时间延迟。默认值为 "False"。此选项仅适用于 Glue 版本 2.0 及更高版本。

  • "idleTimeBetweenReadsInMs":(可选)用于读取。两项连续 getRecords 操作之间的最短时间延迟,以毫秒为单位。默认值为 1000。此选项仅适用于 Glue 版本 2.0 及更高版本。

  • "describeShardInterval":(可选)用于读取。两个 ListShards API 调用之间的最短时间间隔,供您的脚本考虑重新分区。有关更多信息,请参阅《Amazon Kinesis Data Streams 开发人员指南》中的重新分区策略。默认值为 1s

  • "numRetries":(可选)用于读取。Kinesis Data Streams API 请求的最大重试次数。默认值为 3

  • "retryIntervalMs":(可选)用于读取。重试 Kinesis Data Streams API 调用之前的冷却时间(以毫秒为单位指定)。默认值为 1000

  • "maxRetryIntervalMs":(可选)用于读取。Kinesis Data Streams API 调用的两次重试之间的最长冷却时间(以毫秒为单位指定)。默认值为 10000

  • "avoidEmptyBatches":(可选)用于读取。在批处理开始之前检查 Kinesis 数据流中是否有未读数据,避免创建空白微批处理任务。默认值为 "False"

  • "schema":(当 inferSchema 设为 false 时为必填)用于读取。用于处理有效负载的架构。如果分类为 avro,则提供的架构必须采用 Avro 架构格式。如果分类不是 avro,则提供的架构必须采用 DDL 架构格式。

    以下是一些架构示例。

    Example in DDL schema format
    `column1` INT, `column2` STRING , `column3` FLOAT
    Example in Avro schema format
    { "type":"array", "items": { "type":"record", "name":"test", "fields": [ { "name":"_id", "type":"string" }, { "name":"index", "type": [ "int", "string", "float" ] } ] } }
  • "inferSchema":(可选)用于读取。默认值为‘false’。如果设置为“true”,则会在运行时检测到 foreachbatch 内的有效工作负载中的架构。

  • "avroSchema":(已弃用)用于读取。用于指定 Avro 数据架构(使用 Avro 格式时)的参数。此参数现已被弃用。使用 schema 参数。

  • "addRecordTimestamp":(可选)用于读取。当选项设置为 'true' 时,数据输出将包含一个名为 "__src_timestamp" 的附加列,表示数据流收到相应记录的时间。默认值为‘false’。4.0 或更高 AWS Glue 版本支持此选项。

  • "emitConsumerLagMetrics":(可选)用于读取。当选项设置为“true”时,对于每个批次,它将向 CloudWatch 发布数据流接收到的最早记录与该记录到达 AWS Glue 之间的时长指标。指标名称为 "glue.driver.streaming.maxConsumerLagInMs"。默认值为‘false’。4.0 或更高 AWS Glue 版本支持此选项。

  • "fanoutConsumerARN":(可选)用于读取。streamARN 中指定的流的 Kinesis 流用户的 ARN。用于为您的 Kinesis 连接启用增强型扇出功能模式。有关使用增强型扇出功能的 Kinesis 流的更多信息,请参阅 在 Kinesis 流作业中使用增强型扇出功能

  • "recordMaxBufferedTime" –(可选)用于写入。默认值:1000(ms)。记录在等待写入时缓冲的最长时间。

  • "aggregationEnabled" –(可选)用于写入。默认值:真。指定是否应在将记录发送到 Kinesis 之前对其进行汇总。

  • "aggregationMaxSize" –(可选)用于写入。默认值:51200(字节)。如果记录超过了此限制,则它将绕过聚合器。注意:Kinesis 将记录大小限制为 50KB。如果您将其设置为 50KB 以上,Kinesis 将拒绝过大的记录。

  • "aggregationMaxCount" –(可选)用于写入。默认值:4294967295。要打包至汇总记录的最大项目数量。

  • "producerRateLimit" –(可选)用于写入。默认值:150(%)。限制从单个生产者(例如您的作业)发送的每个分片的吞吐量,以占后端限制的百分比表示。

  • "collectionMaxCount" –(可选)用于写入。默认值:500。要装入 PutRecords 请求的最大项目数量。

  • "collectionMaxSize" –(可选)用于写入。默认值:5242880(字节)。通过 PutRecords 请求发送的最大数据量。