Os exemplos a seguir realizam transformações equivalentes. No entanto, o segundo exemplo (SparkSQL) é o mais limpo e eficiente, seguido pela UDF do pandas e, finalmente, pelo mapeamento de baixo nível no primeiro exemplo. O exemplo a seguir é um exemplo completo de uma transformação simples para somar duas colunas:
from awsglue import DynamicFrame
# You can have other auxiliary variables, functions or classes on this file, it won't affect the runtime
def record_sum(rec, col1, col2, resultCol):
rec[resultCol] = rec[col1] + rec[col2]
return rec
# The number and name of arguments must match the definition on json config file
# (expect self which is the current DynamicFrame to transform
# If an argument is optional, you need to define a default value here
# (resultCol in this example is an optional argument)
def custom_add_columns(self, col1, col2, resultCol="result"):
# The mapping will alter the columns order, which could be important
fields = [field.name for field in self.schema()]
if resultCol not in fields:
# If it's a new column put it at the end
fields.append(resultCol)
return self.map(lambda record: record_sum(record, col1, col2, resultCol)).select_fields(paths=fields)
# The name we assign on DynamicFrame must match the configured "functionName"
DynamicFrame.custom_add_columns = custom_add_columns
O exemplo a seguir é uma transformação equivalente usando a API do SparkSQL.
from awsglue import DynamicFrame
# The number and name of arguments must match the definition on json config file
# (expect self which is the current DynamicFrame to transform
# If an argument is optional, you need to define a default value here
# (resultCol in this example is an optional argument)
def custom_add_columns(self, col1, col2, resultCol="result"):
df = self.toDF()
return DynamicFrame.fromDF(
df.withColumn(resultCol, df[col1] + df[col2]) # This is the conversion logic
, self.glue_ctx, self.name)
# The name we assign on DynamicFrame must match the configured "functionName"
DynamicFrame.custom_add_columns = custom_add_columns
O exemplo a seguir usa as mesmas transformações, mas usando uma UDF do pandas, que é mais eficiente do que usar uma UDF simples. Para obter mais informações sobre como escrever UDFs panda, consulte a documentação do Apache Spark SQL
from awsglue import DynamicFrame
import pandas as pd
from pyspark.sql.functions import pandas_udf
# The number and name of arguments must match the definition on json config file
# (expect self which is the current DynamicFrame to transform
# If an argument is optional, you need to define a default value here
# (resultCol in this example is an optional argument)
def custom_add_columns(self, col1, col2, resultCol="result"):
@pandas_udf("integer") # We need to declare the type of the result column
def add_columns(value1: pd.Series, value2: pd.Series) → pd.Series:
return value1 + value2
df = self.toDF()
return DynamicFrame.fromDF(
df.withColumn(resultCol, add_columns(col1, col2)) # This is the conversion logic
, self.glue_ctx, self.name)
# The name we assign on DynamicFrame must match the configured "functionName"
DynamicFrame.custom_add_columns = custom_add_columns