本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
AWS Glue 串流連線
以下各節提供有關如何在「 AWS Glue 串流」中使用連線的資訊。
使用卡夫卡連接
您可以使用卡夫卡連接讀取和寫入使用存儲在數據目錄表中的信息,或通過提供信息直接訪問數據流卡夫卡數據流。此連線支援卡夫卡叢集或 Amazon 管理的 Apache 卡夫卡叢集串流。您可以從卡夫卡讀取信息到火花 DataFrame,然後將其轉換為 Glue。 AWS DynamicFrame您可以以一JSON種格式寫信給 DynamicFrames 卡夫卡。如果您直接存取資料串流,則請使用這些選項來提供如何存取資料串流的相關資訊。
如果您使用getCatalogSource
或create_data_frame_from_catalog
使用卡夫卡流源中的記錄,getCatalogSink
或者write_dynamic_frame_from_catalog
將記錄寫入卡夫卡,並且該作業具有數據目錄數據庫和表名信息,則可以使用該信息來獲取從卡夫卡流源讀取一些基本參數。如果使用getSource
、、getCatalogSink
getSourceWithFormat
、createDataFrameFromOptions
或 getSinkWithFormat
create_data_frame_from_options
write_dynamic_frame_from_catalog
,則必須使用此處描述的連接選項來指定這些基本參數。
您可以使用類中的指定方法下面的參數指定卡夫卡的連接選項。GlueContext
-
Scala
-
connectionOptions
:與getSource
、createDataFrameFromOptions
、getSink
搭配使用 -
additionalOptions
:與getCatalogSource
、getCatalogSink
搭配使用。 -
options
:與getSourceWithFormat
、getSinkWithFormat
搭配使用。
-
-
Python
-
connection_options
:與create_data_frame_from_options
、write_dynamic_frame_from_options
搭配使用。 -
additional_options
:與create_data_frame_from_catalog
、write_dynamic_frame_from_catalog
搭配使用。 -
options
:與getSource
、getSink
搭配使用。
-
如需有關串流ETL工作的注意事項和限制,請參閱串流 ETL 注意事項和限制。
主題
設定 Kafka
連接到通過互聯網可用的卡夫卡流沒有 AWS 先決條件。
您可以建立 AWS Glue Kafka 連線來管理您的連線認證。如需詳細資訊,請參閱為 Apache Kafka 資料串流建立 AWS Glue 連線。在您的 AWS Glue 工作組態中,提供 connectionName
作為附加網絡連接,然後在您的方法調用中提供 connectionName
到參connectionName
數。
在某些情況下,您需要設定其他先決條件:
-
如果將 Apache Kafka 的 Amazon 受管串流與IAM身份驗證搭配使用,您將需要適當IAM的設定。
-
如果在 Amazon 內為 Apache 卡夫卡使用 Amazon 託管流VPC,則需要適當的 Amazon VPC 配置。您必須建立可提供 Amazon 連線資訊的 AWS Glue VPC 連線。您需要工作組態,才能將 AWS Glue 連線納入為其他網路連線。
如需串流ETL工作先決條件的詳細資訊,請參閱在 AWS Glue 中串流 ETL 任務。
範例:從 Kafka 串流讀取
搭配 forEachBatch 使用。
Kafka 串流來源範例:
kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "startingOffsets": "earliest", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)
範例:寫入卡夫卡串流
寫信給卡夫卡的例子:
使用該getSink
方法的示例:
data_frame_datasource0 = glueContext.getSink( connectionType="kafka", connectionOptions={ JsonOptions("""{ "connectionName": "ConfluentKafka", "classification": "json", "topic": "kafka-auth-topic", "typeOfData": "kafka"} """)}, transformationContext="dataframe_ApacheKafka_node1711729173428") .getDataFrame()
使用該write_dynamic_frame.from_options
方法的示例:
kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "classification": "json" } data_frame_datasource0 = glueContext.write_dynamic_frame.from_options(connection_type="kafka", connection_options=kafka_options)
Kafka 連線選項參考
閱讀時,請使用以下連接選項"connectionType": "kafka"
:
-
"bootstrap.servers"
(必要) 啟動程序伺服器清單URLs,例如,asb-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094
。必須在API呼叫中指定此選項,或在「資料目錄」的表格中繼資料中定義此選項。 -
"security.protocol"
(必要) 用來與代理程式通訊的協定。可能的值為"SSL"
或"PLAINTEXT"
。 -
"topicName"
(必要) 要訂閱的主題清單 (以逗號分隔)。您必須指定"topicName"
、"assign"
或"subscribePattern"
其中一個。 -
"assign"
: (必要) JSON 字串,指定要使用的特TopicPartitions
定項目。您必須指定"topicName"
、"assign"
或"subscribePattern"
其中一個。範例:'{"topicA":[0,1],"topicB":[2,4]}'
-
"subscribePattern"
:(必要) 識別要訂閱的主題清單的 Java regex 字串。您必須指定"topicName"
、"assign"
或"subscribePattern"
其中一個。範例:'topic.*'
-
"classification"
(必要) 記錄中資料使用的檔案格式。除非透過資料型錄提供,否則為必要。 -
"delimiter"
(選擇性) 值分隔符號使用時classification
為CSV。預設值為 ",
"。 -
"startingOffsets"
:(選用) 要從中讀取資料的 Kafka 主題的起始位置。可能的值為"earliest"
或"latest"
。預設值為"latest"
。 -
"startingTimestamp"
: (選用,僅適用於 AWS Glue 4.0 版或更新版本) Kafka 主題中要讀取資料的記錄時間戳記。可能的值是模UTC式中格式的時間戳記字串yyyy-mm-ddTHH:MM:SSZ
(其中Z
代表具有 +/-的UTC時區偏移量。 例如:「4 月 4 日上午 8 時至 4 時」)。注意: AWS Glue 串流指令集的 startingTimestamp「連線選項」清單中只能有一個 '' 或 '',包括這兩個屬性會導致工作失敗。startingOffsets
-
"endingOffsets"
:(選用) 批次查詢結束時的終點。可能的值是"latest"
或指定每個結束偏移量的JSON字符串TopicPartition
。對於字JSON符串,格式是
{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}
。值-1
作為偏移代表"latest"
。 -
"pollTimeoutMs"
:(選用) 在 Spark 任務執行器中從 Kafka 輪詢資料的逾時 (以毫秒為單位)。預設值為512
。 -
"numRetries"
:(選用) 擷取 Kafka 位移失敗之前,要重試的次數。預設值為3
。 -
"retryIntervalMs"
:(選用) 重試擷取 Kafka 偏移量之前等待的時間 (毫秒)。預設值為10
。 -
"maxOffsetsPerTrigger"
:(選用) 每個觸發間隔所處理之偏移數目上限的速率限制。指定的偏移總數會按比例跨topicPartitions
或不同磁碟區而分割。預設值為 null,這表示消費者讀取所有偏移,直到已知的最新偏移。 -
"minPartitions"
:(選用) 從 Kafka 讀取所需的分割區最小數量。預設值為 null,這表示 Spark 分割區的數量等於 Kafka 分割區的數量。 -
"includeHeaders"
:(選用) 是否包含 Kafka 標頭。當選項設定為「true」時,資料輸出將包含一個名為「glue_streaming_kafka_headers」的額外欄,其類型為Array[Struct(key: String, value: String)]
。預設值為 "false"。此選項能在 AWS Glue 3.0 版或更新版中使用。 -
"schema"
: (當 inferSchema 設定為 false 時需要) 用來處理承載的結構描述。如果分類為avro
,提供的架構必須採用 Avro 架構格式。如果分類不是提供avro
的模式必須是DDL模式格式。以下是架構範例。
-
"inferSchema"
:(選用) 預設值為 'false'。如果設為 'true',將在執行時間時從foreachbatch
承載偵測架構。 -
"avroSchema"
:(已棄用) 使用 Avro 格式時,用於指定 Avro 資料架構的參數。此參數現已棄用。使用schema
參數。 -
"addRecordTimestamp"
︰(選用) 當此選項設定為 'true' 時,資料輸出將包含一個名為 "__src_timestamp" 的額外資料欄,其指示主題收到相應記錄的時間。預設值為 'false'。在 AWS Glue 4.0 版或更新版中支援此選項。 -
"emitConsumerLagMetrics"
: (選擇性) 當選項設為 'true' 時,對於每個批次,它會發出主題接收到的最舊記錄到達時間之間的持續時間的AWS Glue指標。 CloudWatch該度量標準的名稱是「膠合. 驅動程序. maxConsumerLagInMs」. 預設值為 'false'。在 AWS Glue 4.0 版或更新版中支援此選項。
寫入時,請使用以下連接選項"connectionType": "kafka"
:
-
"connectionName"
(必要)用於連接到卡夫卡集群的 AWS Glue 連接名稱(類似於卡夫卡源)。 -
"topic"
(必要) 如果主題欄存在,則除非設定了主題組態選項,否則在將指定資料列寫入 Kafka 時,會使用其值作為主題。也就是說,組topic
態選項會覆寫主題欄。 -
"partition"
(選擇性) 如果指定了有效的分割區編號,partition
將在傳送記錄時使用。如果沒有指定分區,但存
key
在一個分區,將使用密鑰的散列來選擇一個分區。如果
key
既不存在partition
也不存在,則當至少為分區產生 batch .size 字節時,將根據粘性分區這些更改選擇分區。 -
"key"
(選擇性) 如partition
果為 null,則用於分割。 -
"classification"
(選擇性) 記錄中資料所使用的檔案格式。我們只支持JSONCSV和阿夫羅。使用 Avro 格式,我們可以提供一個自 avroSchema 定義序列化,但請注意,這也需要在源代碼上提供反序列化。否則,默認情況下它使用 Apache AvroSchema 進行序列化。
此外,您可以根據需要通過更新卡夫卡生產者配置參數微調卡夫卡
但是,有一個小的拒絕列表的選項不會生效。如需詳細資訊,請參閱 Kafka 特定
使用 Kinesis 連接
您可以使用 Kinesis 連線讀取和寫入 Amazon Kinesis 資料串流,使用儲存在資料目錄表格中的資訊,或提供直接存取資料串流的資訊。您可以從 Kinesis 讀取信息到火花 DataFrame,然後將其轉換為 AWS Glue DynamicFrame。您可以 DynamicFrames 用一種JSON格式寫入 Kinesis。如果您直接存取資料串流,則請使用這些選項來提供如何存取資料串流的相關資訊。
如果您使用 getCatalogSource
或 create_data_frame_from_catalog
來取用來自 Kinesis 串流來源的記錄,則該任務具有 Data Catalog 資料庫和資料表名稱資訊,並可以使用它來獲取一些從 Kinesis 串流來源讀取的基本參數。如果使用 getSource
、getSourceWithFormat
、createDataFrameFromOptions
或 create_data_frame_from_options
,則您必須使用此處描述的連線選項來指定這些基本參數。
您可以使用 GlueContext
類別中指定方法的下列引數來指定 Kinesis 的連線選項。
-
Scala
-
connectionOptions
:與getSource
、createDataFrameFromOptions
、getSink
搭配使用 -
additionalOptions
:與getCatalogSource
、getCatalogSink
搭配使用。 -
options
:與getSourceWithFormat
、getSinkWithFormat
搭配使用。
-
-
Python
-
connection_options
:與create_data_frame_from_options
、write_dynamic_frame_from_options
搭配使用。 -
additional_options
:與create_data_frame_from_catalog
、write_dynamic_frame_from_catalog
搭配使用。 -
options
:與getSource
、getSink
搭配使用。
-
如需有關串流ETL工作的注意事項和限制,請參閱串流 ETL 注意事項和限制。
設定 Kinesis
若要連線到 AWS Glue Spark 工作中的 Kinesis 資料串流,您需要一些先決條件:
如果讀取, AWS Glue 工作必須具有 Kinesis 資料串流的讀取存取等級IAM權限。
如果要寫入, AWS Glue 工作必須具有 Kinesis 資料串流的寫入存取層級IAM權限。
在某些情況下,您需要設定其他先決條件:
-
如果您的 AWS Glue 任務設定了「其他網路連線」(通常是用來連接到其他資料集),而其中一個連線提供 Amazon VPC 網路選項,則會引導您的任務透過 Amazon 進行通訊VPC。在這種情況下,您還需要設定 Kinesis 資料串流,以便透過 Amazon VPC 進行通訊。您可以在 Amazon VPC 和 Kinesis 資料串流之間建立介面VPC端點來執行此操作。如需詳細資訊,請參閱將 Kinesis Data Streams 與介面VPC端點搭配使用。
-
在另一個帳戶中指定 Amazon Kinesis Data Streams 時,您必須設定角色和政策以允許跨帳戶存取。如需詳細資訊,請參閱範例:從不同帳戶中的 Kinesis 串流讀取。
如需串流ETL工作先決條件的詳細資訊,請參閱在 AWS Glue 中串流 ETL 任務。
從 Kinesis 讀取
範例:從 Kinesis 串流讀取
搭配 forEachBatch 使用。
Amazon Kinesis 串流來源範例:
kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)
寫入 Kinesis
範例:寫入 Kinesis 串流
搭配 forEachBatch 使用。您 DynamicFrame 將以一種JSON格式寫入流。如果任務在數次重試後仍無法寫入,便會失敗。依預設,每 DynamicFrame 筆記錄都會個別傳送至 Kinesis 串流。您可以使用 aggregationEnabled
和關聯的參數來設定此行為。
從串流任務寫入 Amazon Kinesis 的範例:
Kinesis 連線參數
指定 Amazon Kinesis Data Streams 的連線選項。
針對 Kinesis 串流資料來源使用下列的連線選項:
-
"streamARN"
(必要) 用於讀取/寫入。Kinesis 資料串流ARN的。 -
"classification"
(讀取時為必要) 用於讀取。記錄中資料使用的檔案格式。除非透過資料型錄提供,否則為必要。 -
"streamName"
– (選用) 用於讀取。要從中讀取的 Kinesis 資料串流名稱。與endpointUrl
搭配使用。 -
"endpointUrl"
– (選用) 用於讀取。預設值:"https://kinesis.us-east-1.amazonaws.com"。Kinesis 串流的 AWS 端點。除非您要連線到特殊區域,否則無需變更此設定。 -
"partitionKey"
– (選用) 用於寫入。在產生記錄時使用的 Kinesis 分割區索引鍵。 -
"delimiter"
(選用) 用於讀取。當classification
是時使用的值分隔符號CSV。預設值為 ",
"。 -
"startingPosition"
:(選用) 用於讀取。Kinesis 資料串流中要從中讀取資料的起始位置。可能的值為"latest"
、"trim_horizon"
"earliest"
、或模UTC式中格式的時間戳記字串yyyy-mm-ddTHH:MM:SSZ
(其中Z
代表具有 +/-的UTC時區偏移量。 例如:「4 月 4 日上午 8 時至 4 時」)。預設值為"latest"
。注意:只有 AWS Glue 4.0 或更新版本"startingPosition"
才支援UTC格式的時間戳記字串。 -
"failOnDataLoss"
:(選用) 如果有任何作用中的碎片遺失或過期,則任務失敗。預設值為"false"
。 -
"awsSTSRoleARN"
:(選用) 用於讀取/寫入。角色的 Amazon 資源名稱 (ARN) 假設使用 AWS Security Token Service (AWS STS)。此角色必須具有描述或讀取 Kinesis 資料串流記錄操作的許可。存取不同帳戶中的資料串流時,您必須使用此參數。搭配"awsSTSSessionName"
使用。 -
"awsSTSSessionName"
:(選用) 用於讀取/寫入。使用 AWS STS擔任角色之工作階段的識別符。存取不同帳戶中的資料串流時,您必須使用此參數。搭配"awsSTSRoleARN"
使用。 -
"awsSTSEndpoint"
: (選擇性) 以假定角色連線到 Kinesis 時要使用的 AWS STS 端點。這允許在中使用區域 AWS STS 端點VPC,這對於默認全局端點是不可能的。 -
"maxFetchTimeInMs"
:(選用) 用於讀取。工作執行程式從 Kinesis 資料串流讀取目前批次記錄所花費的時間上限,以毫秒 (毫秒) 為單位。在這段時間內可以GetRecords
API撥打多個電話。預設值為1000
。 -
"maxFetchRecordsPerShard"
:(選用) 用於讀取。每個微批次在 Kinesis 資料串流中,每個碎片可擷取的最大記錄數。注意:如果串流工作已讀取 Kinesis 的額外記錄 (在相同的 Get-record 呼叫中),用戶端可能會超過此限制。如果maxFetchRecordsPerShard
需要嚴格,那麼它需要是maxRecordPerRead
. 預設值為100000
。 -
"maxRecordPerRead"
:(選用) 用於讀取。要從每個getRecords
操作的 Kinesis 資料串流中擷取的記錄數量上限。預設值為10000
。 -
"addIdleTimeBetweenReads"
:(選用) 用於讀取。增加兩個連續getRecords
操作之間的時間延遲。預設值為"False"
。此選項僅在 Glue 2.0 及以上版本上才可設定。 -
"idleTimeBetweenReadsInMs"
:(選用) 用於讀取。兩個連續getRecords
操作的最小延遲時間,以毫秒為單位指定。預設值為1000
。此選項僅在 Glue 2.0 及以上版本上才可設定。 -
"describeShardInterval"
:(選用) 用於讀取。兩次ListShards
API調用之間的最小時間間隔,以供腳本考慮重新分片。如需詳細資訊,請參閱 Amazon Kinesis Data Streams 開發人員指南中的重新分片的策略。預設值為1s
。 -
"numRetries"
:(選用) 用於讀取。Kinesis Data Streams API 要求的重試次數上限。預設值為3
。 -
"retryIntervalMs"
:(選用) 用於讀取。重試 Kinesis Data Streams 呼叫之前的冷卻期間 (以毫秒為單位指定)。API預設值為1000
。 -
"maxRetryIntervalMs"
:(選用) 用於讀取。Kinesis Data Streams 呼叫兩次重試之間的最大冷卻期間 (以毫秒為單位指定)。API預設值為10000
。 -
"avoidEmptyBatches"
:(選用) 用於讀取。避免建立空白微批次任務,方法是在批次開始之前檢查 Kinesis 資料串流中是否有未讀取的資料。預設值為"False"
。 -
"schema"
: (當 inferSchema 設定為 false 時需要) 用於讀取。用於處理承載的結構描述。如果分類為avro
,提供的架構必須採用 Avro 架構格式。如果分類不是提供avro
的模式必須是DDL模式格式。以下是架構範例。
-
"inferSchema"
:(選用) 用於讀取。預設值為 'false'。如果設為 'true',將在執行時間時從foreachbatch
承載偵測架構。 -
"avroSchema"
:(已棄用) 用於讀取。使用 Avro 格式時,用於指定 Avro 資料架構的參數。此參數現已棄用。使用schema
參數。 -
"addRecordTimestamp"
:(選用) 用於讀取。當此選項設定為 'true' 時,資料輸出將包含一個名為 "__src_timestamp" 的額外資料欄,其指示串流收到相應記錄的時間。預設值為 'false'。在 AWS Glue 4.0 版或更新版中支援此選項。 -
"emitConsumerLagMetrics"
:(選用) 用於讀取。當該選項設置為 'true' 時,對於每個批次,它將發出流接收到的最舊記錄到達時間之間的持續時間的AWS Glue指標。 CloudWatch該度量標準的名稱是「膠合. 驅動程序. maxConsumerLagInMs」. 預設值為 'false'。在 AWS Glue 4.0 版或更新版中支援此選項。 -
"fanoutConsumerARN"
:(選用) 用於讀取。中streamARN
指定ARN之串流的 Kinesis 串流用戶。用於啟用 Kinesis 連線的強化廣發功能模式。如需有關使用強化廣發功能使用 Kinesis 串流的詳細資訊,請參閱 在 Kinesis 串流任務中使用強化廣發功能。 -
"recordMaxBufferedTime"
– (選用) 用於寫入。預設:1000 (毫秒)。在等待寫入時,記錄受到緩衝的最長時間。 -
"aggregationEnabled"
– (選用) 用於寫入。預設:true。指定是否應在將記錄傳送至 Kinesis 前先彙整記錄。 -
"aggregationMaxSize"
– (選用) 用於寫入。預設:51200 (位元組)。若記錄大於此限制,則其會略過彙整工具。請注意,Kinesis 會強制執行 50 KB 的記錄大小限制。若您將此值設定為超過 50 KB,Kinesis 將會拒絕過大的記錄。 -
"aggregationMaxCount"
– (選用) 用於寫入。預設:4294967295。要匯入彙整記錄的項目數量上限。 -
"producerRateLimit"
– (選用) 用於寫入。預設:150 (%)。作為後端限制的百分比來限制從單一生產者 (例如您的任務) 傳送的每個碎片輸送量。 -
"collectionMaxCount"
– (選用) 用於寫入。預設:500。要打包到 PutRecords 請求中的最大項目數。 -
"collectionMaxSize"
– (選用) 用於寫入。預設:5242880 (位元組)。與 PutRecords 請求一起發送的最大數據量。