本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
Kinesis 连接
您可以使用 Kinesis 连接来对 Amazon Kinesis Data Streams 执行读取和写入操作,方法是通过存储在 Data Catalog 表中的信息进行读取和写入,或通过提供信息直接访问数据流。您可以从 Kinesis 读取信息到 Spark DataFrame 中,然后将其转换为 AWS Glue DynamicFrame。您可以用 JSON 格式将 DynamicFrames 写入 Kinesis。如果直接访问数据流,请使用这些选项提供有关如何访问数据流的信息。
如果您使用 getCatalogSource
或 create_data_frame_from_catalog
使用来自 Kinesis 流式处理源的记录,则任务具有数据目录数据库和表名称信息,将其用于获取一些基本参数,以便从 Kinesis 流式处理源读取数据。如果使用 getSource
、getSourceWithFormat
、createDataFrameFromOptions
或 create_data_frame_from_options
,则必须使用此处描述的连接选项指定这些基本参数。
您可以使用 GlueContext
类中指定方法的以下参数为 Kinesis 指定连接选项。
-
Scala
-
connectionOptions
:与getSource
、createDataFrameFromOptions
、getSink
结合使用 -
additionalOptions
:与getCatalogSource
、getCatalogSink
结合使用 -
options
:与getSourceWithFormat
、getSinkWithFormat
结合使用
-
-
Python
-
connection_options
:与create_data_frame_from_options
、write_dynamic_frame_from_options
结合使用 -
additional_options
:与create_data_frame_from_catalog
、write_dynamic_frame_from_catalog
结合使用 -
options
:与getSource
、getSink
结合使用
-
有关流式处理 ETL 作业的注意事项和限制,请参阅 串流 ETL 注释和限制。
配置 Kinesis
要在 AWS Glue Spark 作业中连接到 Kinesis 数据流,需要具备一些先决条件:
如果读取,AWS Glue 作业必须拥有对 Kinesis 数据流的读取访问权限级别 IAM 权限。
如果写入,AWS Glue 作业必须拥有对 Kinesis 数据流的写入访问权限级别 IAM 权限。
在某些情况下,您需要配置其他先决条件:
-
如果您的 AWS Glue 作业配置了其他网络连接(通常用于连接到其他数据集),并且其中一个连接提供 Amazon VPC 网络选项,则这将引导您的作业通过 Amazon VPC 进行通信。在这种情况下,您还需要将 Kinesis 数据流配置为通过 Amazon VPC 进行通信。您可以通过在 Amazon VPC 和 Kinesis 数据流之间创建接口 VPC 端点实现此目的。有关更多信息,请参阅 Using Kinesis Data Streams with Interface VPC Endpoints。
-
在另一个账户中指定 Amazon Kinesis Data Streams 时,您必须设置角色和策略,从而允许跨账户访问。有关更多信息,请参阅示例:从不同账户的 Kinesis 串流中读取。
有关流式处理 ETL 作业先决条件的更多信息,请参阅 在 AWS Glue 中流式处理 ETL 作业。
示例:从 Kinesis 流读取
示例:从 Kinesis 流读取
与 forEachBatch 结合使用。
Amazon Kinesis 流式处理源示例:
kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)
示例:写入 Kinesis 流
示例:从 Kinesis 流读取
与 forEachBatch 结合使用。
Amazon Kinesis 流式处理源示例:
kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)
Kinesis 连接选项参考
为 Amazon Kinesis Data Streams 指定连接选项。
为 Kinesis 流式处理数据源使用以下连接选项:
-
"streamARN"
(必填)用于读/写。Kinesis 数据流的 ARN。 -
"classification"
(读取所必填)用于读取。记录中数据使用的文件格式。除非 Data Catalog 提供,否则为必需。 -
"streamName"
–(可选)用于读取。要从中读取的 Kinesis 数据流的名称。与endpointUrl
一起使用。 -
"endpointUrl"
–(可选)用于读取。默认:“https://kinesis.us-east-1.amazonaws.com”。Kinesis 流的 AWS 端点。除非要连接到特殊区域,否则您无需对此进行更改。 -
"partitionKey"
–(可选)用于写入。生成记录时所使用的 Kinesis 分区键。 -
"delimiter"
(可选)用于读取。当classification
为 CSV 时使用的值分隔符。默认为“,
”。 -
"startingPosition"
:(可选)用于读取。要从中读取数据的 Kinesis 数据流中的起始位置。可能的值是"latest"
、"trim_horizon"
、"earliest"
或以模式yyyy-mm-ddTHH:MM:SSZ
采用 UTC 格式的时间戳字符串(其中Z
表示带有 +/-的 UTC 时区偏移量。例如:“2023-04-04T08:00:00-04:00”)。默认值为"latest"
。注意:只有 AWS Glue 版本 4.0 或更高版本支持"startingPosition"
的 UTC 格式中的时间戳字符串。 -
"failOnDataLoss"
:(可选)如果有任何活动分片丢失或已过期,则作业失败。默认值为"false"
。 -
"awsSTSRoleARN"
:(可选)用于读取/写入。要使用 AWS Security Token Service(AWS STS)代入的角色的 Amazon 资源名称(ARN)。此角色必须拥有针对 Kinesis 数据流执行描述或读取记录操作的权限。在访问其他账户中的数据流时,必须使用此参数。与"awsSTSSessionName"
结合使用。 -
"awsSTSSessionName"
:(可选)用于读取/写入。使用 AWS STS 代入角色的会话的标识符。在访问其他账户中的数据流时,必须使用此参数。与"awsSTSRoleARN"
结合使用。 -
"awsSTSEndpoint"
:(可选)使用代入角色连接到 Kinesis 时要使用的 AWS STS 端点。这允许在 VPC 中使用区域 AWS STS 端点,而使用默认的全局端点是不可能的。 -
"maxFetchTimeInMs"
:(可选)用于读取。作业执行程序从 Kinesis 数据流中读取当前批处理记录所花费的最长时间,以毫秒为单位指定。在这段时间内可以进行多次GetRecords
API 调用。默认值为1000
。 -
"maxFetchRecordsPerShard"
:(可选)用于读取。每个微批次将从 Kinesis 数据流中的每个分片获取的最大记录数。注意:如果流式传输作业已经从 Kinesis 读取了额外的记录(在同一个 get-records 调用中),则客户端可以超过此限制。如果maxFetchRecordsPerShard
需要严格,则必须是maxRecordPerRead
的整数倍。默认值为100000
。 -
"maxRecordPerRead"
:(可选)用于读取。每项getRecords
操作中要从 Kinesis 数据流获取的最大记录数。默认值为10000
。 -
"addIdleTimeBetweenReads"
:(可选)用于读取。在两项连续getRecords
操作之间添加时间延迟。默认值为"False"
。此选项仅适用于 Glue 版本 2.0 及更高版本。 -
"idleTimeBetweenReadsInMs"
:(可选)用于读取。两项连续getRecords
操作之间的最短时间延迟,以毫秒为单位。默认值为1000
。此选项仅适用于 Glue 版本 2.0 及更高版本。 -
"describeShardInterval"
:(可选)用于读取。两个ListShards
API 调用之间的最短时间间隔,供您的脚本考虑重新分区。有关更多信息,请参阅《Amazon Kinesis Data Streams 开发人员指南》中的重新分区策略。默认值为1s
。 -
"numRetries"
:(可选)用于读取。Kinesis Data Streams API 请求的最大重试次数。默认值为3
。 -
"retryIntervalMs"
:(可选)用于读取。重试 Kinesis Data Streams API 调用之前的冷却时间(以毫秒为单位指定)。默认值为1000
。 -
"maxRetryIntervalMs"
:(可选)用于读取。Kinesis Data Streams API 调用的两次重试之间的最长冷却时间(以毫秒为单位指定)。默认值为10000
。 -
"avoidEmptyBatches"
:(可选)用于读取。在批处理开始之前检查 Kinesis 数据流中是否有未读数据,避免创建空白微批处理任务。默认值为"False"
。 -
"schema"
:(当 inferSchema 设为 false 时为必填)用于读取。用于处理有效负载的架构。如果分类为avro
,则提供的架构必须采用 Avro 架构格式。如果分类不是avro
,则提供的架构必须采用 DDL 架构格式。以下是一些架构示例。
-
"inferSchema"
:(可选)用于读取。默认值为‘false’。如果设置为“true”,则会在运行时检测到foreachbatch
内的有效工作负载中的架构。 -
"avroSchema"
:(已弃用)用于读取。用于指定 Avro 数据架构(使用 Avro 格式时)的参数。此参数现已被弃用。使用schema
参数。 -
"addRecordTimestamp"
:(可选)用于读取。当选项设置为 'true' 时,数据输出将包含一个名为 "__src_timestamp" 的附加列,表示数据流收到相应记录的时间。默认值为‘false’。4.0 或更高 AWS Glue 版本支持此选项。 -
"emitConsumerLagMetrics"
:(可选)用于读取。当选项设置为“true”时,对于每个批次,它将向 CloudWatch 发布数据流接收到的最早记录与该记录到达 AWS Glue 之间的时长指标。指标名称为 "glue.driver.streaming.maxConsumerLagInMs"。默认值为‘false’。4.0 或更高 AWS Glue 版本支持此选项。 -
"fanoutConsumerARN"
:(可选)用于读取。streamARN
中指定的流的 Kinesis 流用户的 ARN。用于为您的 Kinesis 连接启用增强型扇出功能模式。有关使用增强型扇出功能的 Kinesis 流的更多信息,请参阅 在 Kinesis 流作业中使用增强型扇出功能。 -
"recordMaxBufferedTime"
–(可选)用于写入。默认值:1000(ms)。记录在等待写入时缓冲的最长时间。 -
"aggregationEnabled"
–(可选)用于写入。默认值:真。指定是否应在将记录发送到 Kinesis 之前对其进行汇总。 -
"aggregationMaxSize"
–(可选)用于写入。默认值:51200(字节)。如果记录超过了此限制,则它将绕过聚合器。注意:Kinesis 将记录大小限制为 50KB。如果您将其设置为 50KB 以上,Kinesis 将拒绝过大的记录。 -
"aggregationMaxCount"
–(可选)用于写入。默认值:4294967295。要打包至汇总记录的最大项目数量。 -
"producerRateLimit"
–(可选)用于写入。默认值:150(%)。限制从单个生产者(例如您的作业)发送的每个分片的吞吐量,以占后端限制的百分比表示。 -
"collectionMaxCount"
–(可选)用于写入。默认值:500。要装入 PutRecords 请求的最大项目数量。 -
"collectionMaxSize"
–(可选)用于写入。默认值:5242880(字节)。通过 PutRecords 请求发送的最大数据量。