ステップ 2. 変換ロジックを実装する
注記
カスタムビジュアル変換は Python スクリプトのみをサポートしています。Scala はサポートされていません。
.json 設定ファイルで定義された関数を実装するコードを追加するには、Python ファイルを .json ファイルと同じ場所に同じ名前で「.py」拡張子を付けて配置することをお勧めします。AWS Glue Studio では、.json ファイルと .py ファイルが自動的にペアリングされるため、設定ファイルで Python ファイルのパスを指定する必要はありません。
Python ファイルに、指定されたパラメータを設定した宣言済み関数を追加し、その関数を DynamicFrame
で使用するために登録します。Python ファイルの例を次に示します。
from awsglue import DynamicFrame # self refers to the DynamicFrame to transform, # the parameter names must match the ones defined in the config # if it's optional, need to provide a default value def myTransform(self, email, phone, age=None, gender="", country="", promotion=False): resulting_dynf = # do some transformation on self return resulting_dynf DynamicFrame.myTransform = myTransform
Python コードの開発とテストを最も早く行うために、AWS Glue ノートブックの使用をお勧めします。「AWS Glue Studio 中でのノートブックの使用開始」を参照してください。
変換ロジックの実装方法を説明するために、以下の例でのカスタムビジュアル変換は、入力データをフィルタリングして特定の米国の州に関連するデータのみを保持する変換になっています。.json ファイルには functionName
のパラメータが custom_filter_state
として含まれ、2 つの引数 (「str」型の「state」と「colName」) が含まれています。
.json 設定ファイルの例は次のとおりです。
{ "name": "custom_filter_state", "displayName": "Filter State", "description": "A simple example to filter the data to keep only the state indicated.", "functionName": "custom_filter_state", "parameters": [ { "name": "colName", "displayName": "Column name", "type": "str", "description": "Name of the column in the data that holds the state postal code" }, { "name": "state", "displayName": "State postal code", "type": "str", "description": "The postal code of the state whole rows to keep" } ] }
Python でコンパニオンスクリプトを実装するには
-
AWS Glue ノートブックを起動し、セッションを開始するために提供された最初のセルを実行します。最初のセルを実行すると、必要な基本コンポーネントが作成されます。
-
例で説明されているようにフィルタリングを実行する関数を作成し、
DynamicFrame
に登録します。以下のコードをコピーして、AWS Glue ノートブックのセルに貼り付けます。from awsglue import DynamicFrame def custom_filter_state(self, colName, state): return self.filter(lambda row: row[colName] == state) DynamicFrame.custom_filter_state = custom_filter_state
-
サンプルデータを作成またはロードして、同じセルまたは新しいセルでコードをテストします。サンプルデータを新しいセルに追加する場合は、セルを実行することを忘れないでください。例:
# A few of rows of sample data to test data_sample = [ {"state": "CA", "count": 4}, {"state": "NY", "count": 2}, {"state": "WA", "count": 3} ] df1 = glueContext.sparkSession.sparkContext.parallelize(data_sample).toDF() dynf1 = DynamicFrame.fromDF(df1, glueContext, None)
-
さまざまな引数で次のようにテストして、「custom_filter_state」を検証します。
-
いくつかのテストを実行した後、.py 拡張子を付けてコードを保存し、.py ファイルに .json ファイル名と同じ名前を付けます。.py ファイルと .json ファイルは同じ変換フォルダにある必要があります。
次のコードをコピーしてファイルに貼り付け、.py ファイル拡張子に変更します。
from awsglue import DynamicFrame def custom_filter_state(self, colName, state): return self.filter(lambda row: row[colName] == state) DynamicFrame.custom_filter_state = custom_filter_state
-
AWS Glue Studio でビジュアルジョブを開き、使用可能な [Transforms] (変換) のリストから変換を選択して変換をジョブに追加します。
この変換を Python スクリプトコードで再利用するには、Amazon S3 パスを [Referenced files path] (参照されるファイルパス) の下のジョブの .py ファイルに追加し、スクリプト内の python ファイルの名前 (拡張子なし) をファイルの先頭に追加してインポートします。例えば、
import
<ファイル名 (拡張子なし)>