Uso de marcadores de trabajo
AWS Glue para Spark usa marcadores de flujo de trabajo para realizar un seguimiento de los datos que ya se han procesado. Para obtener un resumen de la característica de marcadores de trabajo y lo que admite, consulte Seguimiento de los datos procesados mediante marcadores de trabajo. Al programar un trabajo de AWS Glue con marcadores, tiene acceso a una flexibilidad que no está disponible en los trabajos visuales.
-
Al leer desde la JDBC, puede especificar las columnas que se van a utilizar como claves de marcador en el script de AWS Glue.
-
Puede elegir qué
transformation_ctx
aplicar a cada llamada de método.
Siempre, al principio de un script, haga una llamada a job.init
y, al final, a job.commit
con los parámetros configurados de manera apropiada. Estas dos características se utilizan para inicializar el servicio de marcadores y actualizar el servicio con el cambio de estado. Los favoritos no funcionarán sin llamarlos.
Especifique las claves de los marcadores
En el caso de los flujos de trabajo de JDBC, el marcador realiza un seguimiento de las filas que ha leído el trabajo al comparar los valores de los campos clave con un valor marcado. Esto no es necesario ni aplicable a los flujos de trabajo de Amazon S3. Al escribir un script AWS Glue sin el editor visual, puede especificar con marcadores qué columna desea rastrear. También puede especificar varias columnas. Se permiten espacios en la secuencia de valores al especificar las claves de marcadores definidas por el usuario.
aviso
Si se utilizan claves de marcador definidas por el usuario, deben aumentar o disminuir de forma monotónica estrictamente. Al seleccionar campos adicionales para una clave compuesta, los campos para conceptos como “versiones secundarias” o “números de revisión” no cumplen este criterio, ya que sus valores se reutilizan en todo el conjunto de datos.
Puede especificar jobBookmarkKeys
y jobBookmarkKeysSortOrder
de las siguientes maneras:
-
create_dynamic_frame.from_catalog
: utiliceadditional_options
. -
create_dynamic_frame.from_options
: utiliceconnection_options
.
Contexto de transformación
Muchos de los métodos de marco dinámico de AWS Glue PySpark incluyen un parámetro opcional denominado transformation_ctx
, que es un identificador único para la instancia de operador de ETL. El parámetro transformation_ctx
se utiliza para identificar la información de estado dentro de un marcador de trabajo para el operador determinado. En concreto, AWS Glue utiliza transformation_ctx
para indexar la clave del estado de marcador.
aviso
transformation_ctx
sirve de clave para realizar búsquedas en el estado de favorito de un origen específico del script. Para que el favorito funcione correctamente, siempre se debe conservar la coherencia entre el origen y el elemento transformation_ctx
asociado. Modificar la propiedad de origen o cambiarle el nombre a transformation_ctx
puede hacer que el favorito anterior deje de ser válido y que el filtrado basado en marcas temporales no produzca el resultado correcto.
Para que los marcadores de flujo de trabajo funcionen correctamente, habilite el parámetro de marcador de trabajo y establezca el parámetro transformation_ctx
. Si no pasa el parámetro transformation_ctx
, no se habilitan los marcadores de trabajo para un marco dinámico o una tabla que se usen en el método. Por ejemplo, si tiene un trabajo de ETL que lee y combina dos orígenes de Amazon S3, puede decidir transferir el parámetro transformation_ctx
únicamente a los métodos para los que desea habilitar marcadores. Si restablece el marcador de trabajo de un trabajo, se restablecen todas las transformaciones que están asociadas al trabajo sea cual sea el transformation_ctx
utilizado.
Para obtener más información acerca de la clase DynamicFrameReader
, consulte Clase DynamicFrameReader. Para obtener más información acerca de las extensiones de PySpark, consulte Referencia de las extensiones de PySpark de AWS Glue.
Ejemplos
A continuación, se muestra un ejemplo de un script generado para un origen de datos de Amazon S3. Las partes del script que son necesarias para utilizar marcadores de trabajo se muestran en cursiva. Para obtener más información sobre estos elementos, consulte Clase GlueContext API y Clase DynamicFrameWriter API.
# Sample Script import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) datasource0 = glueContext.create_dynamic_frame.from_catalog( database = "database", table_name = "relatedqueries_csv", transformation_ctx = "datasource0" ) applymapping1 = ApplyMapping.apply( frame = datasource0, mappings = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")], transformation_ctx = "applymapping1" ) datasink2 = glueContext.write_dynamic_frame.from_options( frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://input_path"}, format = "json", transformation_ctx = "datasink2" ) job.commit()
A continuación se muestra un ejemplo de un script generado para un origen JDBC. La tabla de origen es una tabla de empleados con la columna empno
como clave principal. Aunque el trabajo utiliza una clave principal secuencial como clave de marcador predeterminada, si no se especifica ninguna clave de marcador, porque empno
no es necesariamente secuencial (podría haber brechas en los valores), esta clave no califica como clave de marcador predeterminada. Por lo tanto, el script designa empno
explícitamente como la clave de marcador. Esa parte del código se muestra en cursiva.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) datasource0 = glueContext.create_dynamic_frame.from_catalog( database = "hr", table_name = "emp", transformation_ctx = "datasource0", additional_options = {"jobBookmarkKeys":["empno"],"jobBookmarkKeysSortOrder":"asc"} ) applymapping1 = ApplyMapping.apply( frame = datasource0, mappings = [("ename", "string", "ename", "string"), ("hrly_rate", "decimal(38,0)", "hrly_rate", "decimal(38,0)"), ("comm", "decimal(7,2)", "comm", "decimal(7,2)"), ("hiredate", "timestamp", "hiredate", "timestamp"), ("empno", "decimal(5,0)", "empno", "decimal(5,0)"), ("mgr", "decimal(5,0)", "mgr", "decimal(5,0)"), ("photo", "string", "photo", "string"), ("job", "string", "job", "string"), ("deptno", "decimal(3,0)", "deptno", "decimal(3,0)"), ("ssn", "decimal(9,0)", "ssn", "decimal(9,0)"), ("sal", "decimal(7,2)", "sal", "decimal(7,2)")], transformation_ctx = "applymapping1" ) datasink2 = glueContext.write_dynamic_frame.from_options( frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://hr/employees"}, format = "csv", transformation_ctx = "datasink2" ) job.commit()