AWS Glue 串流連線 - AWS Glue

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

AWS Glue 串流連線

以下各節提供有關如何在「 AWS Glue 串流」中使用連線的資訊。

使用卡夫卡連接

您可以使用卡夫卡連接讀取和寫入使用存儲在數據目錄表中的信息,或通過提供信息直接訪問數據流卡夫卡數據流。此連線支援卡夫卡叢集或 Amazon 管理的 Apache 卡夫卡叢集串流。您可以從卡夫卡讀取信息到火花 DataFrame,然後將其轉換為 Glue。 AWS DynamicFrame您可以以一JSON種格式寫信給 DynamicFrames 卡夫卡。如果您直接存取資料串流,則請使用這些選項來提供如何存取資料串流的相關資訊。

如果您使用getCatalogSourcecreate_data_frame_from_catalog使用卡夫卡流源中的記錄,getCatalogSink或者write_dynamic_frame_from_catalog將記錄寫入卡夫卡,並且該作業具有數據目錄數據庫和表名信息,則可以使用該信息來獲取從卡夫卡流源讀取一些基本參數。如果使用getSource、、getCatalogSinkgetSourceWithFormatcreateDataFrameFromOptionsgetSinkWithFormat create_data_frame_from_optionswrite_dynamic_frame_from_catalog,則必須使用此處描述的連接選項來指定這些基本參數。

您可以使用類中的指定方法下面的參數指定卡夫卡的連接選項。GlueContext

  • Scala

    • connectionOptions:與 getSourcecreateDataFrameFromOptionsgetSink 搭配使用

    • additionalOptions:與 getCatalogSourcegetCatalogSink 搭配使用。

    • options:與 getSourceWithFormatgetSinkWithFormat 搭配使用。

  • Python

    • connection_options:與 create_data_frame_from_optionswrite_dynamic_frame_from_options 搭配使用。

    • additional_options:與 create_data_frame_from_catalogwrite_dynamic_frame_from_catalog 搭配使用。

    • options:與 getSourcegetSink 搭配使用。

如需有關串流ETL工作的注意事項和限制,請參閱串流 ETL 注意事項和限制

主題

    設定 Kafka

    連接到通過互聯網可用的卡夫卡流沒有 AWS 先決條件。

    您可以建立 AWS Glue Kafka 連線來管理您的連線認證。如需詳細資訊,請參閱為 Apache Kafka 資料串流建立 AWS Glue 連線。在您的 AWS Glue 工作組態中,提供 connectionName 作為附加網絡連接,然後在您的方法調用中提供 connectionName 到參connectionName數。

    在某些情況下,您需要設定其他先決條件:

    • 如果將 Apache Kafka 的 Amazon 受管串流與IAM身份驗證搭配使用,您將需要適當IAM的設定。

    • 如果在 Amazon 內為 Apache 卡夫卡使用 Amazon 託管流VPC,則需要適當的 Amazon VPC 配置。您必須建立可提供 Amazon 連線資訊的 AWS Glue VPC 連線。您需要工作組態,才能將 AWS Glue 連線納入為其他網路連線

    如需串流ETL工作先決條件的詳細資訊,請參閱在 AWS Glue 中串流 ETL 任務

    範例:從 Kafka 串流讀取

    搭配 forEachBatch 使用。

    Kafka 串流來源範例:

    kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "startingOffsets": "earliest", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)

    範例:寫入卡夫卡串流

    寫信給卡夫卡的例子:

    使用該getSink方法的示例:

    data_frame_datasource0 = glueContext.getSink( connectionType="kafka", connectionOptions={ JsonOptions("""{ "connectionName": "ConfluentKafka", "classification": "json", "topic": "kafka-auth-topic", "typeOfData": "kafka"} """)}, transformationContext="dataframe_ApacheKafka_node1711729173428") .getDataFrame()

    使用該write_dynamic_frame.from_options方法的示例:

    kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "classification": "json" } data_frame_datasource0 = glueContext.write_dynamic_frame.from_options(connection_type="kafka", connection_options=kafka_options)

    Kafka 連線選項參考

    閱讀時,請使用以下連接選項"connectionType": "kafka"

    • "bootstrap.servers"(必要) 啟動程序伺服器清單URLs,例如,as b-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094。必須在API呼叫中指定此選項,或在「資料目錄」的表格中繼資料中定義此選項。

    • "security.protocol"(必要) 用來與代理程式通訊的協定。可能的值為 "SSL""PLAINTEXT"

    • "topicName" (必要) 要訂閱的主題清單 (以逗號分隔)。您必須指定 "topicName""assign""subscribePattern" 其中一個。

    • "assign": (必要) JSON 字串,指定要使用的特TopicPartitions定項目。您必須指定 "topicName""assign""subscribePattern" 其中一個。

      範例:'{"topicA":[0,1],"topicB":[2,4]}'

    • "subscribePattern":(必要) 識別要訂閱的主題清單的 Java regex 字串。您必須指定 "topicName""assign""subscribePattern" 其中一個。

      範例:'topic.*'

    • "classification" (必要) 記錄中資料使用的檔案格式。除非透過資料型錄提供,否則為必要。

    • "delimiter"(選擇性) 值分隔符號使用時classification為CSV。預設值為 ","。

    • "startingOffsets":(選用) 要從中讀取資料的 Kafka 主題的起始位置。可能的值為 "earliest""latest"。預設值為 "latest"

    • "startingTimestamp": (選用,僅適用於 AWS Glue 4.0 版或更新版本) Kafka 主題中要讀取資料的記錄時間戳記。可能的值是模UTC式中格式的時間戳記字串 yyyy-mm-ddTHH:MM:SSZ (其中Z代表具有 +/-的UTC時區偏移量。 例如:「4 月 4 日上午 8 時至 4 時」)。

      注意: AWS Glue 串流指令集的 startingTimestamp「連線選項」清單中只能有一個 '' 或 '',包括這兩個屬性會導致工作失敗。startingOffsets

    • "endingOffsets":(選用) 批次查詢結束時的終點。可能的值是"latest"或指定每個結束偏移量的JSON字符串TopicPartition

      對於字JSON符串,格式是{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}。值 -1 作為偏移代表 "latest"

    • "pollTimeoutMs":(選用) 在 Spark 任務執行器中從 Kafka 輪詢資料的逾時 (以毫秒為單位)。預設值為 512

    • "numRetries":(選用) 擷取 Kafka 位移失敗之前,要重試的次數。預設值為 3

    • "retryIntervalMs":(選用) 重試擷取 Kafka 偏移量之前等待的時間 (毫秒)。預設值為 10

    • "maxOffsetsPerTrigger":(選用) 每個觸發間隔所處理之偏移數目上限的速率限制。指定的偏移總數會按比例跨 topicPartitions 或不同磁碟區而分割。預設值為 null,這表示消費者讀取所有偏移,直到已知的最新偏移。

    • "minPartitions":(選用) 從 Kafka 讀取所需的分割區最小數量。預設值為 null,這表示 Spark 分割區的數量等於 Kafka 分割區的數量。

    • "includeHeaders":(選用) 是否包含 Kafka 標頭。當選項設定為「true」時,資料輸出將包含一個名為「glue_streaming_kafka_headers」的額外欄,其類型為 Array[Struct(key: String, value: String)]。預設值為 "false"。此選項能在 AWS Glue 3.0 版或更新版中使用。

    • "schema": (當 inferSchema 設定為 false 時需要) 用來處理承載的結構描述。如果分類為 avro,提供的架構必須採用 Avro 架構格式。如果分類不是提供avro的模式必須是DDL模式格式。

      以下是架構範例。

      Example in DDL schema format
      'column1' INT, 'column2' STRING , 'column3' FLOAT
      Example in Avro schema format
      { "type":"array", "items": { "type":"record", "name":"test", "fields": [ { "name":"_id", "type":"string" }, { "name":"index", "type": [ "int", "string", "float" ] } ] } }
    • "inferSchema":(選用) 預設值為 'false'。如果設為 'true',將在執行時間時從 foreachbatch 承載偵測架構。

    • "avroSchema":(已棄用) 使用 Avro 格式時,用於指定 Avro 資料架構的參數。此參數現已棄用。使用 schema 參數。

    • "addRecordTimestamp"︰(選用) 當此選項設定為 'true' 時,資料輸出將包含一個名為 "__src_timestamp" 的額外資料欄,其指示主題收到相應記錄的時間。預設值為 'false'。在 AWS Glue 4.0 版或更新版中支援此選項。

    • "emitConsumerLagMetrics": (選擇性) 當選項設為 'true' 時,對於每個批次,它會發出主題接收到的最舊記錄到達時間之間的持續時間的AWS Glue指標。 CloudWatch該度量標準的名稱是「膠合. 驅動程序. maxConsumerLagInMs」. 預設值為 'false'。在 AWS Glue 4.0 版或更新版中支援此選項。

    寫入時,請使用以下連接選項"connectionType": "kafka"

    • "connectionName"(必要)用於連接到卡夫卡集群的 AWS Glue 連接名稱(類似於卡夫卡源)。

    • "topic"(必要) 如果主題欄存在,則除非設定了主題組態選項,否則在將指定資料列寫入 Kafka 時,會使用其值作為主題。也就是說,組topic態選項會覆寫主題欄。

    • "partition"(選擇性) 如果指定了有效的分割區編號,partition將在傳送記錄時使用。

      如果沒有指定分區,但存key在一個分區,將使用密鑰的散列來選擇一個分區。

      如果key既不存在partition也不存在,則當至少為分區產生 batch .size 字節時,將根據粘性分區這些更改選擇分區。

    • "key"(選擇性) 如partition果為 null,則用於分割。

    • "classification"(選擇性) 記錄中資料所使用的檔案格式。我們只支持JSONCSV和阿夫羅。

      使用 Avro 格式,我們可以提供一個自 avroSchema 定義序列化,但請注意,這也需要在源代碼上提供反序列化。否則,默認情況下它使用 Apache AvroSchema 進行序列化。

    此外,您可以根據需要通過更新卡夫卡生產者配置參數微調卡夫卡水槽。請注意,連接選項上沒有允許列出,所有鍵值對都保留在接收器上。

    但是,有一個小的拒絕列表的選項不會生效。如需詳細資訊,請參閱 Kafka 特定組態。

    使用 Kinesis 連接

    您可以使用 Kinesis 連線讀取和寫入 Amazon Kinesis 資料串流,使用儲存在資料目錄表格中的資訊,或提供直接存取資料串流的資訊。您可以從 Kinesis 讀取信息到火花 DataFrame,然後將其轉換為 AWS Glue DynamicFrame。您可以 DynamicFrames 用一種JSON格式寫入 Kinesis。如果您直接存取資料串流,則請使用這些選項來提供如何存取資料串流的相關資訊。

    如果您使用 getCatalogSourcecreate_data_frame_from_catalog 來取用來自 Kinesis 串流來源的記錄,則該任務具有 Data Catalog 資料庫和資料表名稱資訊,並可以使用它來獲取一些從 Kinesis 串流來源讀取的基本參數。如果使用 getSourcegetSourceWithFormatcreateDataFrameFromOptionscreate_data_frame_from_options,則您必須使用此處描述的連線選項來指定這些基本參數。

    您可以使用 GlueContext 類別中指定方法的下列引數來指定 Kinesis 的連線選項。

    • Scala

      • connectionOptions:與 getSourcecreateDataFrameFromOptionsgetSink 搭配使用

      • additionalOptions:與 getCatalogSourcegetCatalogSink 搭配使用。

      • options:與 getSourceWithFormatgetSinkWithFormat 搭配使用。

    • Python

      • connection_options:與 create_data_frame_from_optionswrite_dynamic_frame_from_options 搭配使用。

      • additional_options:與 create_data_frame_from_catalogwrite_dynamic_frame_from_catalog 搭配使用。

      • options:與 getSourcegetSink 搭配使用。

    如需有關串流ETL工作的注意事項和限制,請參閱串流 ETL 注意事項和限制

    設定 Kinesis

    若要連線到 AWS Glue Spark 工作中的 Kinesis 資料串流,您需要一些先決條件:

    • 如果讀取, AWS Glue 工作必須具有 Kinesis 資料串流的讀取存取等級IAM權限。

    • 如果要寫入, AWS Glue 工作必須具有 Kinesis 資料串流的寫入存取層級IAM權限。

    在某些情況下,您需要設定其他先決條件:

    • 如果您的 AWS Glue 任務設定了「其他網路連線」(通常是用來連接到其他資料集),而其中一個連線提供 Amazon VPC 網路選項,則會引導您的任務透過 Amazon 進行通訊VPC。在這種情況下,您還需要設定 Kinesis 資料串流,以便透過 Amazon VPC 進行通訊。您可以在 Amazon VPC 和 Kinesis 資料串流之間建立介面VPC端點來執行此操作。如需詳細資訊,請參閱將 Kinesis Data Streams 與介面VPC端點搭配使用。

    • 在另一個帳戶中指定 Amazon Kinesis Data Streams 時,您必須設定角色和政策以允許跨帳戶存取。如需詳細資訊,請參閱範例:從不同帳戶中的 Kinesis 串流讀取

    如需串流ETL工作先決條件的詳細資訊,請參閱在 AWS Glue 中串流 ETL 任務

    從 Kinesis 讀取

    範例:從 Kinesis 串流讀取

    搭配 forEachBatch 使用。

    Amazon Kinesis 串流來源範例:

    kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)

    寫入 Kinesis

    範例:寫入 Kinesis 串流

    搭配 forEachBatch 使用。您 DynamicFrame 將以一種JSON格式寫入流。如果任務在數次重試後仍無法寫入,便會失敗。依預設,每 DynamicFrame 筆記錄都會個別傳送至 Kinesis 串流。您可以使用 aggregationEnabled 和關聯的參數來設定此行為。

    從串流任務寫入 Amazon Kinesis 的範例:

    Python
    glueContext.write_dynamic_frame.from_options( frame=frameToWrite connection_type="kinesis", connection_options={ "partitionKey": "part1", "streamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/streamName", } )
    Scala
    glueContext.getSinkWithFormat( connectionType="kinesis", options=JsonOptions("""{ "streamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/streamName", "partitionKey": "part1" }"""), ) .writeDynamicFrame(frameToWrite)

    Kinesis 連線參數

    指定 Amazon Kinesis Data Streams 的連線選項。

    針對 Kinesis 串流資料來源使用下列的連線選項:

    • "streamARN" (必要) 用於讀取/寫入。Kinesis 資料串流ARN的。

    • "classification" (讀取時為必要) 用於讀取。記錄中資料使用的檔案格式。除非透過資料型錄提供,否則為必要。

    • "streamName" – (選用) 用於讀取。要從中讀取的 Kinesis 資料串流名稱。與 endpointUrl 搭配使用。

    • "endpointUrl" – (選用) 用於讀取。預設值:"https://kinesis.us-east-1.amazonaws.com"。Kinesis 串流的 AWS 端點。除非您要連線到特殊區域,否則無需變更此設定。

    • "partitionKey" – (選用) 用於寫入。在產生記錄時使用的 Kinesis 分割區索引鍵。

    • "delimiter" (選用) 用於讀取。當classification是時使用的值分隔符號CSV。預設值為 ","。

    • "startingPosition":(選用) 用於讀取。Kinesis 資料串流中要從中讀取資料的起始位置。可能的值為"latest""trim_horizon""earliest"、或模UTC式中格式的時間戳記字串 yyyy-mm-ddTHH:MM:SSZ (其中Z代表具有 +/-的UTC時區偏移量。 例如:「4 月 4 日上午 8 時至 4 時」)。預設值為 "latest"。注意:只有 AWS Glue 4.0 或更新版本"startingPosition"才支援UTC格式的時間戳記字串。

    • "failOnDataLoss":(選用) 如果有任何作用中的碎片遺失或過期,則任務失敗。預設值為 "false"

    • "awsSTSRoleARN":(選用) 用於讀取/寫入。角色的 Amazon 資源名稱 (ARN) 假設使用 AWS Security Token Service (AWS STS)。此角色必須具有描述或讀取 Kinesis 資料串流記錄操作的許可。存取不同帳戶中的資料串流時,您必須使用此參數。搭配 "awsSTSSessionName" 使用。

    • "awsSTSSessionName":(選用) 用於讀取/寫入。使用 AWS STS擔任角色之工作階段的識別符。存取不同帳戶中的資料串流時,您必須使用此參數。搭配 "awsSTSRoleARN" 使用。

    • "awsSTSEndpoint": (選擇性) 以假定角色連線到 Kinesis 時要使用的 AWS STS 端點。這允許在中使用區域 AWS STS 端點VPC,這對於默認全局端點是不可能的。

    • "maxFetchTimeInMs":(選用) 用於讀取。工作執行程式從 Kinesis 資料串流讀取目前批次記錄所花費的時間上限,以毫秒 (毫秒) 為單位。在這段時間內可以GetRecordsAPI撥打多個電話。預設值為 1000

    • "maxFetchRecordsPerShard":(選用) 用於讀取。每個微批次在 Kinesis 資料串流中,每個碎片可擷取的最大記錄數。注意:如果串流工作已讀取 Kinesis 的額外記錄 (在相同的 Get-record 呼叫中),用戶端可能會超過此限制。如果maxFetchRecordsPerShard需要嚴格,那麼它需要是maxRecordPerRead. 預設值為 100000

    • "maxRecordPerRead":(選用) 用於讀取。要從每個 getRecords 操作的 Kinesis 資料串流中擷取的記錄數量上限。預設值為 10000

    • "addIdleTimeBetweenReads":(選用) 用於讀取。增加兩個連續 getRecords 操作之間的時間延遲。預設值為 "False"。此選項僅在 Glue 2.0 及以上版本上才可設定。

    • "idleTimeBetweenReadsInMs":(選用) 用於讀取。兩個連續 getRecords 操作的最小延遲時間,以毫秒為單位指定。預設值為 1000。此選項僅在 Glue 2.0 及以上版本上才可設定。

    • "describeShardInterval":(選用) 用於讀取。兩次ListShardsAPI調用之間的最小時間間隔,以供腳本考慮重新分片。如需詳細資訊,請參閱 Amazon Kinesis Data Streams 開發人員指南中的重新分片的策略。預設值為 1s

    • "numRetries":(選用) 用於讀取。Kinesis Data Streams API 要求的重試次數上限。預設值為 3

    • "retryIntervalMs":(選用) 用於讀取。重試 Kinesis Data Streams 呼叫之前的冷卻期間 (以毫秒為單位指定)。API預設值為 1000

    • "maxRetryIntervalMs":(選用) 用於讀取。Kinesis Data Streams 呼叫兩次重試之間的最大冷卻期間 (以毫秒為單位指定)。API預設值為 10000

    • "avoidEmptyBatches":(選用) 用於讀取。避免建立空白微批次任務,方法是在批次開始之前檢查 Kinesis 資料串流中是否有未讀取的資料。預設值為 "False"

    • "schema": (當 inferSchema 設定為 false 時需要) 用於讀取。用於處理承載的結構描述。如果分類為 avro,提供的架構必須採用 Avro 架構格式。如果分類不是提供avro的模式必須是DDL模式格式。

      以下是架構範例。

      Example in DDL schema format
      `column1` INT, `column2` STRING , `column3` FLOAT
      Example in Avro schema format
      { "type":"array", "items": { "type":"record", "name":"test", "fields": [ { "name":"_id", "type":"string" }, { "name":"index", "type": [ "int", "string", "float" ] } ] } }
    • "inferSchema":(選用) 用於讀取。預設值為 'false'。如果設為 'true',將在執行時間時從 foreachbatch 承載偵測架構。

    • "avroSchema":(已棄用) 用於讀取。使用 Avro 格式時,用於指定 Avro 資料架構的參數。此參數現已棄用。使用 schema 參數。

    • "addRecordTimestamp":(選用) 用於讀取。當此選項設定為 'true' 時,資料輸出將包含一個名為 "__src_timestamp" 的額外資料欄,其指示串流收到相應記錄的時間。預設值為 'false'。在 AWS Glue 4.0 版或更新版中支援此選項。

    • "emitConsumerLagMetrics":(選用) 用於讀取。當該選項設置為 'true' 時,對於每個批次,它將發出流接收到的最舊記錄到達時間之間的持續時間的AWS Glue指標。 CloudWatch該度量標準的名稱是「膠合. 驅動程序. maxConsumerLagInMs」. 預設值為 'false'。在 AWS Glue 4.0 版或更新版中支援此選項。

    • "fanoutConsumerARN":(選用) 用於讀取。中streamARN指定ARN之串流的 Kinesis 串流用戶。用於啟用 Kinesis 連線的強化廣發功能模式。如需有關使用強化廣發功能使用 Kinesis 串流的詳細資訊,請參閱 在 Kinesis 串流任務中使用強化廣發功能

    • "recordMaxBufferedTime" – (選用) 用於寫入。預設:1000 (毫秒)。在等待寫入時,記錄受到緩衝的最長時間。

    • "aggregationEnabled" – (選用) 用於寫入。預設:true。指定是否應在將記錄傳送至 Kinesis 前先彙整記錄。

    • "aggregationMaxSize" – (選用) 用於寫入。預設:51200 (位元組)。若記錄大於此限制,則其會略過彙整工具。請注意,Kinesis 會強制執行 50 KB 的記錄大小限制。若您將此值設定為超過 50 KB,Kinesis 將會拒絕過大的記錄。

    • "aggregationMaxCount" – (選用) 用於寫入。預設:4294967295。要匯入彙整記錄的項目數量上限。

    • "producerRateLimit" – (選用) 用於寫入。預設:150 (%)。作為後端限制的百分比來限制從單一生產者 (例如您的任務) 傳送的每個碎片輸送量。

    • "collectionMaxCount" – (選用) 用於寫入。預設:500。要打包到 PutRecords 請求中的最大項目數。

    • "collectionMaxSize" – (選用) 用於寫入。預設:5242880 (位元組)。與 PutRecords 請求一起發送的最大數據量。