步驟 2. 實作轉換邏輯 - AWS Glue

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

步驟 2. 實作轉換邏輯

注意

自訂視覺化轉換僅支援 Python 指令碼。不支援 Scala。

若要新增實作由 .json 組態檔案定義之函數的程式碼,建議將 Python 檔案放置在與 .json 檔案相同的位置,具有相同的名稱,但副檔名為 ".py"。AWS Glue Studio 會自動配對 .json 和 .py 檔案,讓您免於在組態檔案中指定 Python 檔案的路徑。

在 Python 檔案中,新增宣告的函數,設定具名參數並註冊用於 DynamicFrame。以下是 Python 檔案的範例:

from awsglue import DynamicFrame # self refers to the DynamicFrame to transform, # the parameter names must match the ones defined in the config # if it's optional, need to provide a default value def myTransform(self, email, phone, age=None, gender="", country="", promotion=False): resulting_dynf = # do some transformation on self return resulting_dynf DynamicFrame.myTransform = myTransform

建議使用 AWS Glue 筆記本,以最快的方式開發和測試 Python 程式碼。請參閱 AWS Glue Studio 中的筆記本入門

為了說明如何實作轉換邏輯,下列範例中的自訂視覺化轉換是用來篩選傳入資料的轉換,以便僅保留與美國特定州相關的資料。.json 檔案包含 functionName 的參數 custom_filter_state 和兩個引數 (類型為 "str" 的 "state" 和 "colName")。

範例組態 .json 檔案是:

{ "name": "custom_filter_state", "displayName": "Filter State", "description": "A simple example to filter the data to keep only the state indicated.", "functionName": "custom_filter_state", "parameters": [ { "name": "colName", "displayName": "Column name", "type": "str", "description": "Name of the column in the data that holds the state postal code" }, { "name": "state", "displayName": "State postal code", "type": "str", "description": "The postal code of the state whole rows to keep" } ] }
在 Python 中實作隨附指令碼
  1. 啟動 AWS Glue 筆記本並執行提供給要啟動之工作階段的初始儲存格。執行初始儲存格會建立必要的基本元件。

  2. 建立執行篩選的函數 (如範例中所述),並在 DynamicFrame 上註冊。複製下方的程式碼並貼入 AWS Glue 筆記本的儲存格中。

    from awsglue import DynamicFrame def custom_filter_state(self, colName, state): return self.filter(lambda row: row[colName] == state) DynamicFrame.custom_filter_state = custom_filter_state
  3. 建立或載入範例資料,以測試相同儲存格或新儲存格中的程式碼。如果在新儲存格中新增範例資料,請不要忘記執行該儲存格。例如:

    # A few of rows of sample data to test data_sample = [ {"state": "CA", "count": 4}, {"state": "NY", "count": 2}, {"state": "WA", "count": 3} ] df1 = glueContext.sparkSession.sparkContext.parallelize(data_sample).toDF() dynf1 = DynamicFrame.fromDF(df1, glueContext, None)
  4. 測試以使用不同的引數驗證 "custom_filter_state":

    螢幕擷取畫面顯示了 AWS Glue 筆記本中的儲存格,其中包含傳遞給 dynamicFrame.show 函數的引數。
  5. 執行多個測試後,使用 .py 副檔名儲存程式碼,並使用與 .json 檔案名稱相同的名稱命名 .py 檔案。.py 和 .json 檔案應該位於相同的轉換資料夾中。

    複製下方程式碼並貼入檔案中,再使用 .py 檔案副檔名重新命名。

    from awsglue import DynamicFrame def custom_filter_state(self, colName, state): return self.filter(lambda row: row[colName] == state) DynamicFrame.custom_filter_state = custom_filter_state
  6. 在 AWS Glue Studio 中,開啟視覺化任務,然後從可用的 Transforms (轉換) 清單中選取轉換,以將該轉換新增至任務。

    若要在 Python 指令碼程式碼中重複使用此轉換,請將 Amazon S3 路徑新增至「參考檔案路徑」下任務中的 .py 檔案,然後在指令碼中匯入 Python 檔案的名稱 (不含副檔名),方法為將 Python 檔案的名稱新增至檔案頂端。例如:import <檔案名稱 (不含副檔名)>