本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
Kafka 连接
您可以使用 Kafka 连接使用存储在数据目录表中的信息读取和写入 Kafka 数据流,或者通过提供直接访问数据流的信息。该连接支持 Kafka 集群或适用于 Apache Kafka 的亚马逊托管流媒体 Kafka 集群。你可以从 Kafka 读取信息到 Spark 中 DataFrame,然后将其转换为 AWS Glue。 DynamicFrame你可以用某种JSON格式写 DynamicFrames 入 Kafka。如果直接访问数据流,请使用这些选项提供有关如何访问数据流的信息。
如果您通过 getCatalogSource
或 create_data_frame_from_catalog
来使用来自 Kafka 流式处理源的记录,或通过 getCatalogSink
或 write_dynamic_frame_from_catalog
将记录写入 Kafka,则作业具有 Data Catalog 数据库和表名称信息,并且可以使用该信息来获取一些基本参数,以便从 Kafka 流式处理源读取数据。如果使用 getSource
、getCatalogSink
、getSourceWithFormat
、getSinkWithFormat
、createDataFrameFromOptions
、create_data_frame_from_options
或 write_dynamic_frame_from_catalog
,则必须使用此处描述的连接选项指定这些基本参数。
您可以使用 GlueContext
类中指定方法的以下参数为 Kafka 指定连接选项。
-
Scala
-
connectionOptions
:与getSource
、createDataFrameFromOptions
、getSink
结合使用 -
additionalOptions
:与getCatalogSource
、getCatalogSink
结合使用 -
options
:与getSourceWithFormat
、getSinkWithFormat
结合使用
-
-
Python
-
connection_options
:与create_data_frame_from_options
、write_dynamic_frame_from_options
结合使用 -
additional_options
:与create_data_frame_from_catalog
、write_dynamic_frame_from_catalog
结合使用 -
options
:与getSource
、getSink
结合使用
-
有关直播ETL作业的注意事项和限制,请参阅串流 ETL 注释和限制。
主题
配置 Kafka
连接可通过互联网访问的 Kafka 直播没有任何 AWS 先决条件。
您可以创建 AWS Glue Kafka 连接来管理您的连接凭证。有关更多信息,请参阅 为 Apache Kafka 数据流创建 AWS Glue 连接。在你的 AWS Glue 作业配置中,提供 connectionName
作为额外的网络连接,然后,在你的方法调用中,提供 connectionName
到connectionName
参数。
在某些情况下,您需要配置其他先决条件:
-
如果使用带身份验证的 Apache Kafka 版亚马逊托管流媒体 Kafka,则需要IAM进行适当的配置。IAM
-
如果在亚马逊中使用适用于 Apache Kafka 的亚马逊托管流媒体,则需要适当的VPC亚马逊配置。VPC您需要创建一个提供亚马逊连接 AWS 信息的 Glue VPC 连接。您需要在作业配置中将 Glue AWS 连接作为附加网络连接包括在内。
有关流式处理ETL作业先决条件的更多信息,请参阅在 AWS Glue 中流式处理 ETL 作业。
示例:从 Kafka 流读取
与 forEachBatch 结合使用。
Kafka 流式处理源示例:
kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "startingOffsets": "earliest", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)
示例:写入 Kafka 流
写入 Kafka 的示例:
getSink
方法示例:
data_frame_datasource0 = glueContext.getSink( connectionType="kafka", connectionOptions={ JsonOptions("""{ "connectionName": "ConfluentKafka", "classification": "json", "topic": "kafka-auth-topic", "typeOfData": "kafka"} """)}, transformationContext="dataframe_ApacheKafka_node1711729173428") .getDataFrame()
write_dynamic_frame.from_options
方法示例:
kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "classification": "json" } data_frame_datasource0 = glueContext.write_dynamic_frame.from_options(connection_type="kafka", connection_options=kafka_options)
Kafka 连接选项参考
在读取时,可以使用以下连接选项和 "connectionType": "kafka"
:
-
"bootstrap.servers"
(必需)引导服务器列表URLs,例如,asb-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094
。此选项必须在API调用中指定或在数据目录的表元数据中定义。 -
"security.protocol"
(必填)用于与代理通信的协议。可能的值为"SSL"
或"PLAINTEXT"
。 -
"topicName"
(必填)要订阅的以逗号分隔的主题列表。您必须指定"topicName"
、"assign"
或"subscribePattern"
中的其中一个,且只能指定一个。 -
"assign"
:(必需)一个JSON字符串,指定TopicPartitions
要使用的特定内容。您必须指定"topicName"
、"assign"
或"subscribePattern"
中的其中一个,且只能指定一个。例如:“{"topicA":[0,1],"topicB":[2,4]}”
-
"subscribePattern"
:(必需)标识要订阅的主题列表的 Java 正则表达式字符串。您必须指定"topicName"
、"assign"
或"subscribePattern"
中的其中一个,且只能指定一个。示例:“topic.*”
-
"classification"
(必需)记录中数据使用的文件格式。除非 Data Catalog 提供,否则为必需。 -
"delimiter"
(可选)当为时classification
使用的值分隔符CSV。默认为“,
”。 -
"startingOffsets"
:(可选)Kafka 主题中数据读取的起始位置。可能的值为"earliest"
或"latest"
。默认值为"latest"
。 -
"startingTimestamp"
:(可选,仅支持 AWS Glue 4.0 或更高版本)Kafka 主题中要从中读取数据的记录的时间戳。可能的值是模式UTC格式的时间戳字符串yyyy-mm-ddTHH:MM:SSZ
(其中Z
表示带有 +/-UTC 的时区偏移量。 例如:“2023-04-04T 08:00:00-04:00”)。注意: AWS Glue 流式传输脚本的 “连接选项startingTimestamp” 列表中只能出现 “” 或 “” 中的一个,包括这两个属性都会导致任务失败。startingOffsets
-
"endingOffsets"
:(可选)批处理查询结束时的终点。可能的值是"latest"
或为每个值指定结束偏移量的JSON字符串TopicPartition
。对于JSON字符串,格式为
{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}
。偏移值-1
表示"latest"
。 -
"pollTimeoutMs"
:(可选)Spark 任务执行程序中,从 Kafka 轮询数据的超时时间(以毫秒为单位)。默认值为512
。 -
"numRetries"
:(可选)无法获取 Kafka 偏移时的重试次数。默认值为3
。 -
"retryIntervalMs"
:(可选)重试获取 Kafka 偏移时的等待时间(以毫秒为单位)。默认值为10
。 -
"maxOffsetsPerTrigger"
:(可选)每个触发间隔处理的最大偏移数的速率限制。指定的总偏移数跨不同卷的topicPartitions
按比例分割。默认值为 null,这意味着使用者读取所有偏移,直到已知的最新偏移。 -
"minPartitions"
:(可选)从 Kafka 读取数据的必需最小分区数。默认值为 null,这意味着 Spark 分区数等于 Kafka 分区数。 -
"includeHeaders"
:(可选)是否包含 Kafka 标头。当选项设置为“true”时,数据输出将包含一个名为“glue_streaming_kafka_headers”的附加列,类型为Array[Struct(key: String, value: String)]
。默认值为“false”。此选项仅适用于 AWS Glue 版本 3.0 或更高版本。 -
"schema"
:( inferSchema 设置为 false 时为必填项)用于处理负载的架构。如果分类为avro
,则提供的架构必须采用 Avro 架构格式。如果分类不是avro
,则提供的架构必须采用DDL架构格式。以下是一些架构示例。
-
"inferSchema"
:(可选)默认值为“false”。如果设置为“true”,则会在运行时检测到foreachbatch
内的有效工作负载中的架构。 -
"avroSchema"
:(已弃用)用于指定 Avro 数据架构(使用 Avro 格式时)的参数。此参数现已被弃用。使用schema
参数。 -
"addRecordTimestamp"
:(可选)当选项设置为 'true' 时,数据输出将包含一个名为 "__src_timestamp" 的附加列,表示主题收到相应记录的时间。默认值为‘false’。4.0 或更高 AWS Glue 版本支持此选项。 -
"emitConsumerLagMetrics"
:(可选)当该选项设置为 “true” 时,对于每个批次,它将发布从主题收到的最旧记录到该记录到达的时间之间的持续时间内的AWS Glue指标。 CloudWatch该指标的名字是 “glue.driver.streaming”。 maxConsumerLagInMs”。默认值为‘false’。4.0 或更高 AWS Glue 版本支持此选项。
在写入时,可以使用以下连接选项和 "connectionType": "kafka"
:
-
"connectionName"
(必填)用于连接到 Kafka 集群的 AWS Glue 连接的名称(类似于 Kafka 源代码)。 -
"topic"
(必填项)如果存在主题列,则在将给定行写入 Kafka 时,除非设置了主题配置选项,否则其值将用作主题。也就是说,topic
配置选项会覆盖主题列。 -
"partition"
(可选)如果指定了有效的分区号,则partition
将在发送记录时使用该分区号。如果未指定分区但存在
key
,则将使用该键的哈希值来选择分区。如果
key
和partition
都不存在,则将根据在向分区至少生成了 batch.size 字节的数据时,对这些更改进行粘性分区来选择分区。 -
"key"
(可选)如果partition
为空,则用于分区。 -
"classification"
(可选)记录中数据使用的文件格式。我们只支持JSON,CSV还有 Avro。使用 Avro 格式,我们可以提供用于序列化的自定义 avroSchema ,但请注意,还需要在源代码中提供该格式以进行反序列化。否则,默认情况下它使用 Apache AvroSchema 进行序列化。
此外,您可以在需要时通过更新 Kafka 生产者配置参数
虽然存在一个很小的选项拒绝列表,不过不会生效。有关更多信息,请参阅 Kafka specific configurations