在 AWS Glue 中使用 CSV 格式 - AWS Glue

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

在 AWS Glue 中使用 CSV 格式

AWS Glue 會從各來源擷取資料,並將資料寫入以各種資料格式儲存和傳輸的目標。如果您的資料以 CSV 資料格式存放或傳輸,本文件將向您介紹在 AWS Glue 中使用資料的可用功能。

AWS Glue 支援使用逗號分隔值 (CSV) 格式。此格式是一種極小的、以列為基礎的資料格式。CSV 通常不會嚴格符合標準,但您可以參閱 RFC 4180RFC 7111 來了解詳細資訊。

您可以使用 AWS Glue 從 Amazon S3 和串流來源讀取 CSV,並將 CSV 寫入 Amazon S3。您可以讀取和寫入來自 S3 的包含 CSV 檔案的 bzipgzip 封存。您可以在 S3 連線參數 上設定壓縮行為,而不是在本頁討論的組態中設定。

下表顯示支援 CSV 格式選項的常見 AWS Glue 功能。

讀取 寫入 串流讀取 對小型檔案進行分組 任務書籤
支援 支援 支援 支援 支援

範例:從 S3 讀取 CSV 檔案或資料夾

先決條件:您需要指向希望讀取的 CSV 檔案或資料夾的 S3 路徑 (s3path)。

組態:在您的函數選項中,指定 format="csv"。在您的 connection_options 中,使用 paths 鍵來指定 s3path。您可以在 connection_options 中設定讀取器與 S3 互動的方式。如需詳細資訊,請參閱 AWS Glue 中 ETL 的連線類型和選項:S3 連線參數。您可以在 format_options 中設定讀取器解譯 CSV 的方式。如需詳細資訊,請參閱 CSV 組態參考

以下 AWS Glue ETL 指令碼顯示從 S3 讀取 CSV 檔案或資料夾的流程。

我們透過 optimizePerformance 組態鍵為常見工作流提供經過效能最佳化的自訂 CSV 讀取器。若要判斷此讀取器是否適合您的工作負載,請參閱 使用向量化 SIMD CSV 讀取器最佳化讀取效能

Python

在此範例中,使用 create_dynamic_frame.from_options 方法。

# Example: Read CSV from S3 # For show, we handle a CSV with a header row. Set the withHeader option. # Consider whether optimizePerformance is right for your workflow. from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) spark = glueContext.spark_session dynamicFrame = glueContext.create_dynamic_frame.from_options( connection_type="s3", connection_options={"paths": ["s3://s3path"]}, format="csv", format_options={ "withHeader": True, # "optimizePerformance": True, }, )

您也可以在指令碼中使用 DataFrames (pyspark.sql.DataFrame)。

dataFrame = spark.read\ .format("csv")\ .option("header", "true")\ .load("s3://s3path")
Scala

在此範例中,使用 getSourceWithFormat 操作。

// Example: Read CSV from S3 // For show, we handle a CSV with a header row. Set the withHeader option. // Consider whether optimizePerformance is right for your workflow. import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.{DynamicFrame, GlueContext} import org.apache.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val dynamicFrame = glueContext.getSourceWithFormat( formatOptions=JsonOptions("""{"withHeader": true}"""), connectionType="s3", format="csv", options=JsonOptions("""{"paths": ["s3://s3path"], "recurse": true}""") ).getDynamicFrame() } }

您也可以在指令碼中使用 DataFrames (org.apache.spark.sql.DataFrame)。

val dataFrame = spark.read .option("header","true") .format("csv") .load("s3://s3path“)

範例:將 CSV 檔案和資料夾寫入 S3

先決條件:您需要已初始化的 DataFrame (dataFrame) 或 DynamicFrame (dynamicFrame)。您還需要預期的 S3 輸出路徑 s3path

組態:在您的函數選項中,指定 format="csv"。在您的 connection_options 中,使用 paths 鍵來指定 s3path。您可以在 connection_options 中設定寫入器與 S3 的互動方式。如需詳細資訊,請參閱 AWS Glue 中 ETL 的連線類型和選項:S3 連線參數。您可以在 format_options 中設定操作如何寫入檔案內容。如需詳細資訊,請參閱 CSV 組態參考。以下 AWS Glue ETL 指令碼顯示將 CSV 檔案和資料夾寫入 S3 的流程。

Python

在此範例中,使用 write_dynamic_frame.from_options 方法。

# Example: Write CSV to S3 # For show, customize how we write string type values. Set quoteChar to -1 so our values are not quoted. from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) glueContext.write_dynamic_frame.from_options( frame=dynamicFrame, connection_type="s3", connection_options={"path": "s3://s3path"}, format="csv", format_options={ "quoteChar": -1, }, )

您也可以在指令碼中使用 DataFrames (pyspark.sql.DataFrame)。

dataFrame.write\ .format("csv")\ .option("quote", None)\ .mode("append")\ .save("s3://s3path")
Scala

在此範例中,使用 getSinkWithFormat 方法。

// Example: Write CSV to S3 // For show, customize how we write string type values. Set quoteChar to -1 so our values are not quoted. import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.{DynamicFrame, GlueContext} import org.apache.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) glueContext.getSinkWithFormat( connectionType="s3", options=JsonOptions("""{"path": "s3://s3path"}"""), format="csv" ).writeDynamicFrame(dynamicFrame) } }

您也可以在指令碼中使用 DataFrames (org.apache.spark.sql.DataFrame)。

dataFrame.write .format("csv") .option("quote", null) .mode("Append") .save("s3://s3path")

CSV 組態參考

您都可以使用下列 format_options,無論 AWS Glue 程式庫在何處指定 format="csv"

  • separator – 指定分隔符號字元。預設值為逗號,但您仍可指定任何其他字元。

    • 類型:文字,預設:","

  • escaper – 指定用於逸出的字元。只有在讀取 (而非寫入) CSV 檔案時,才會使用此選項。若啟用,後面緊接的字元會維持現狀,一小組已知的逸出字元 (\n\r\t\0) 除外。

    • 類型:文字,預設:

  • quoteChar – 指定用於引用的字元。預設為雙引號。將之設為 -1 可完全關閉引用功能。

    • 類型:文字,預設:'"'

  • multiLine – 指定單項記錄是否可以跨越多行。當欄位內含引用的新行字元時,可能就會發生這種情況。若有任何記錄跨越多行,請務必將此選項設為 True。啟用 multiLine 可能會降低性能,因為在剖析時需要更加謹慎的檔案分割。

    • 類型:布林值,預設:false

  • withHeader – 指定是否要將第一行做為標頭。DynamicFrameReader 類別可使用該選項。

    • 類型:布林值,預設:false

  • writeHeader – 指定是否要將標頭寫入輸出之中。DynamicFrameWriter 類別可使用該選項。

    • 類型:布林值,預設:true

  • skipFirst – 指定是否要略過第一個資料行。

    • 類型:布林值,預設:false

  • optimizePerformance – 指定是否要使用進階 SIMD CSV 讀取器,以及以 Apache Arrow 為基礎的直欄式記憶體格式。僅適用於 AWS Glue 3.0+。

    • 類型:布林值,預設:false

  • strictCheckForQuoting:寫入 CSV 時,Glue 可能會在解讀為字串的值中加上引號。這樣做是為了防止寫出的內容模糊。為了節省時間,決定要寫入的內容時,Glue 可能會在不需要引號的某些情況下進行引用。啟用嚴格檢查將執行更密集的運算,並且僅在必要時引用。僅適用於 AWS Glue 3.0+。

    • 類型:布林值,預設:false

使用向量化 SIMD CSV 讀取器最佳化讀取效能

AWS Glue 3.0 版本添加了一個經最佳化的 CSV 讀取器,與以資料行為基礎 CSV 讀取器相比,可以大幅加快整體任務效能。

經最佳化的閱讀器:

  • 使用 CPU SIMD 指令從磁碟讀取

  • 立即以直欄式格式 (Apache Arrow) 將記錄寫入記憶體

  • 將記錄分成多個批次

這樣可以節省稍後將記錄分批或轉換為直欄式格式的處理時間。有些範例是變更結構描述或依資料行擷取資料時。

要使用經最佳化的讀取器,請在 format_options 或資料表屬性中將 "optimizePerformance" 設為 true

glueContext.create_dynamic_frame.from_options( frame = datasource1, connection_type = "s3", connection_options = {"paths": ["s3://s3path"]}, format = "csv", format_options={ "optimizePerformance": True, "separator": "," }, transformation_ctx = "datasink2")
向量化 CSV 讀取器的限制

請注意向量化 CSV 讀取器的下列限制:

  • 它不支援 multiLineescaper 格式選項。它使用雙引號字元 '"' 的預設 escaper。設定這些選項後,AWS Glue 會自動回退到使用以列為基礎的 CSV 讀取器。

  • 它不支援建立具有 ChoiceType 的 DynamicFrame。

  • 它不支援建立具有錯誤記錄的 DynamicFrame。

  • 它不支援讀取含有多位元組字元 (例如日文或中文字元) 的 CSV 檔案。