Kafka 连接 - AWS Glue

Kafka 连接

您可以使用 Kafka 连接来对 Kafka 数据流执行读取和写入操作,方法是通过存储在 Data Catalog 表中的信息进行读取和写入,或通过提供信息直接访问数据流。该连接支持 Kafka 集群或 Amazon Managed Streaming for Apache Kafka 集群。您可以将 Kafka 中的信息读取到 Spark DataFrame 中,然后将其转换为 AWS Glue DynamicFrame。您可以用 JSON 格式将 DynamicFrames 写入 Kafka。如果直接访问数据流,请使用这些选项提供有关如何访问数据流的信息。

如果您通过 getCatalogSourcecreate_data_frame_from_catalog 来使用来自 Kafka 流式处理源的记录,或通过 getCatalogSinkwrite_dynamic_frame_from_catalog 将记录写入 Kafka,则作业具有 Data Catalog 数据库和表名称信息,并且可以使用该信息来获取一些基本参数,以便从 Kafka 流式处理源读取数据。如果使用 getSourcegetCatalogSinkgetSourceWithFormatgetSinkWithFormatcreateDataFrameFromOptionscreate_data_frame_from_optionswrite_dynamic_frame_from_catalog,则必须使用此处描述的连接选项指定这些基本参数。

您可以使用 GlueContext 类中指定方法的以下参数为 Kafka 指定连接选项。

  • Scala

    • connectionOptions:与 getSourcecreateDataFrameFromOptionsgetSink 结合使用

    • additionalOptions:与 getCatalogSourcegetCatalogSink 结合使用

    • options:与 getSourceWithFormatgetSinkWithFormat 结合使用

  • Python

    • connection_options:与 create_data_frame_from_optionswrite_dynamic_frame_from_options 结合使用

    • additional_options:与 create_data_frame_from_catalogwrite_dynamic_frame_from_catalog 结合使用

    • options:与 getSourcegetSink 结合使用

有关流式处理 ETL 作业的注意事项和限制,请参阅 串流 ETL 注释和限制

主题

    配置 Kafka

    连接到可通过互联网访问的 Kafka 流没有任何 AWS 先决条件。

    您可以创建 AWS Glue Kafka 连接来管理连接凭证。有关更多信息,请参阅 为 Apache Kafka 数据流创建 AWS Glue 连接。在 AWS Glue 作业配置中,提供 connectionName 作为附加网络连接,然后在方法调用中为 connectionName 参数提供 connectionName

    在某些情况下,您需要配置其他先决条件:

    • 如果使用 Amazon Managed Streaming for Apache Kafka 搭配 IAM 身份验证,则需要适当的 IAM 配置。

    • 如果在 Amazon VPC 内使用 Amazon Managed Streaming for Apache Kafka,则需要适当的 Amazon VPC 配置。您需要创建一个提供 Amazon VPC 连接信息的 AWS Glue 连接。您需要在作业配置中将 AWS Glue 连接作为附加网络连接包括在内。

    有关流式处理 ETL 作业先决条件的更多信息,请参阅 在 AWS Glue 中流式处理 ETL 作业

    示例:从 Kafka 流读取

    forEachBatch 结合使用。

    Kafka 流式处理源示例:

    kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "startingOffsets": "earliest", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)

    示例:写入 Kafka 流

    写入 Kafka 的示例:

    getSink 方法示例:

    data_frame_datasource0 = glueContext.getSink( connectionType="kafka", connectionOptions={ JsonOptions("""{ "connectionName": "ConfluentKafka", "classification": "json", "topic": "kafka-auth-topic", "typeOfData": "kafka"} """)}, transformationContext="dataframe_ApacheKafka_node1711729173428") .getDataFrame()

    write_dynamic_frame.from_options 方法示例:

    kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "classification": "json" } data_frame_datasource0 = glueContext.write_dynamic_frame.from_options(connection_type="kafka", connection_options=kafka_options)

    Kafka 连接选项参考

    在读取时,可以使用以下连接选项和 "connectionType": "kafka"

    • "bootstrap.servers"(必需)引导服务器 URL 的列表,例如,作为 b-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094。此选项必须在 API 调用中指定,或在数据目录的表元数据中定义。

    • "security.protocol"(必填)用于与代理通信的协议。可能的值为 "SSL""PLAINTEXT"

    • "topicName"(必填)要订阅的以逗号分隔的主题列表。您必须指定"topicName""assign""subscribePattern" 中的其中一个,且只能指定一个。

    • "assign":(必填)用于指定要使用的 TopicPartitions 的 JSON 字符串。您必须指定"topicName""assign""subscribePattern" 中的其中一个,且只能指定一个。

      例如:“{"topicA":[0,1],"topicB":[2,4]}”

    • "subscribePattern":(必需)标识要订阅的主题列表的 Java 正则表达式字符串。您必须指定"topicName""assign""subscribePattern" 中的其中一个,且只能指定一个。

      示例:“topic.*”

    • "classification"(必需)记录中数据使用的文件格式。除非 Data Catalog 提供,否则为必需。

    • "delimiter"(可选)当 classification 为 CSV 时使用的值分隔符。默认为“,”。

    • "startingOffsets":(可选)Kafka 主题中数据读取的起始位置。可能的值为 "earliest""latest"。默认值为 "latest"

    • "startingTimestamp":(可选,仅 AWS Glue 4.0 或更高版本支持)Kafka 主题中数据读取的记录的时间戳。可能的值是以模式 yyyy-mm-ddTHH:MM:SSZ 采用 UTC 格式的时间戳字符串(其中 Z 表示带有 +/-的 UTC 时区偏移量。例如:“2023-04-04T08:00:00-04:00”)。

      注意:AWS Glue 流式处理脚本的“连接选项”列表中只能出现“startingOffsets”或“startingTimestamp”中的一个,同时包括这两个属性会导致作业失败。

    • "endingOffsets":(可选)批处理查询结束时的终点。可能值为 "latest",或者为每个 TopicPartition 指定结束偏移的 JSON 字符串。

      对于 JSON 字符串,格式为 {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}。偏移值 -1 表示 "latest"

    • "pollTimeoutMs":(可选)Spark 任务执行程序中,从 Kafka 轮询数据的超时时间(以毫秒为单位)。默认值为 512

    • "numRetries":(可选)无法获取 Kafka 偏移时的重试次数。默认值为 3

    • "retryIntervalMs":(可选)重试获取 Kafka 偏移时的等待时间(以毫秒为单位)。默认值为 10

    • "maxOffsetsPerTrigger":(可选)每个触发间隔处理的最大偏移数的速率限制。指定的总偏移数跨不同卷的 topicPartitions 按比例分割。默认值为 null,这意味着使用者读取所有偏移,直到已知的最新偏移。

    • "minPartitions":(可选)从 Kafka 读取数据的必需最小分区数。默认值为 null,这意味着 Spark 分区数等于 Kafka 分区数。

    • "includeHeaders":(可选)是否包含 Kafka 标头。当选项设置为“true”时,数据输出将包含一个名为“glue_streaming_kafka_headers”的附加列,类型为 Array[Struct(key: String, value: String)]。默认值为“false”。此选项仅适用于 AWS Glue 版本 3.0 或更高版本。

    • "schema":(当 inferSchema 设为 false 时为必填)用于处理有效工作负载的架构。如果分类为 avro,则提供的架构必须采用 Avro 架构格式。如果分类不是 avro,则提供的架构必须采用 DDL 架构格式。

      以下是一些架构示例。

      Example in DDL schema format
      'column1' INT, 'column2' STRING , 'column3' FLOAT
      Example in Avro schema format
      { "type":"array", "items": { "type":"record", "name":"test", "fields": [ { "name":"_id", "type":"string" }, { "name":"index", "type": [ "int", "string", "float" ] } ] } }
    • "inferSchema":(可选)默认值为“false”。如果设置为“true”,则会在运行时检测到 foreachbatch 内的有效工作负载中的架构。

    • "avroSchema":(已弃用)用于指定 Avro 数据架构(使用 Avro 格式时)的参数。此参数现已被弃用。使用 schema 参数。

    • "addRecordTimestamp":(可选)当选项设置为 'true' 时,数据输出将包含一个名为 "__src_timestamp" 的附加列,表示主题收到相应记录的时间。默认值为‘false’。4.0 或更高 AWS Glue 版本支持此选项。

    • "emitConsumerLagMetrics":(可选)当该选项设置为 'true' 时,对于每个批次,它会向 CloudWatch 发布主题收到的最早记录与该记录到达 AWS Glue 之间的时长指标。指标名称为 "glue.driver.streaming.maxConsumerLagInMs"。默认值为‘false’。4.0 或更高 AWS Glue 版本支持此选项。

    在写入时,可以使用以下连接选项和 "connectionType": "kafka"

    • "connectionName"(必填项)用于连接到 Kafka 集群(类似于 Kafka 源)的 AWS Glue 连接的名称。

    • "topic"(必填项)如果存在主题列,则在将给定行写入 Kafka 时,除非设置了主题配置选项,否则其值将用作主题。也就是说,topic 配置选项会覆盖主题列。

    • "partition"(可选)如果指定了有效的分区号,则 partition 将在发送记录时使用该分区号。

      如果未指定分区但存在 key,则将使用该键的哈希值来选择分区。

      如果 keypartition 都不存在,则将根据在向分区至少生成了 batch.size 字节的数据时,对这些更改进行粘性分区来选择分区。

    • "key"(可选)如果 partition 为空,则用于分区。

    • "classification"(可选)记录中数据使用的文件格式。我们只支持 JSON、CSV 和 Avro。

      对于 Avro 格式,我们可以提供自定义 avroSchema 来进行序列化,但请注意,还需要在源中提供该格式以进行反序列化。否则,默认情况下它将使用 Apache AvroSchema 进行序列化。

    此外,您可以在需要时通过更新 Kafka 生产者配置参数来微调 Kafka 接收器。请注意,不存在有关连接选项的允许列表,所有键值对都按原样保存在接收器上。

    虽然存在一个很小的选项拒绝列表,不过不会生效。有关更多信息,请参阅 Kafka specific configurations