Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Utilisation des marque-pages de tâche
AWS Glue pour Spark utilise les signets de tâche pour suivre les données qui ont déjà été traitées. Pour un résumé de la fonctionnalité des signets de tâches et de ce qu'elle prend en charge, consultez Suivi des données traitées à l'aide de signets de tâche. Lorsque vous programmez une tâche AWS Glue avec des signets, vous bénéficiez d'une flexibilité qui n'est pas disponible dans les tâches visuelles.
-
Lors de la lecture à partir de JDBC, vous pouvez spécifier la ou les colonnes à utiliser comme clés de signet dans votre script AWS Glue.
-
Vous pouvez choisir la
transformation_ctx
à appliquer à chaque appel de méthode.
Appelez toujours job.init
au début et job.commit
à la fin du script avec les paramètres correctement configurés. Ces deux fonctions initialisent le service de signet et mettent à jour le changement d'état vers le service. Les marque-pages ne fonctionnent pas s’ils ne sont pas appelés.
Spécifier les clés des signets
Pour les flux de travail JDBC, le signet assure le suivi des lignes lues par votre tâche en comparant les valeurs des champs clés à la valeur d'un signet. Cela n'est ni nécessaire ni applicable aux flux de travail Amazon S3. Lorsque vous écrivez un script AWS Glue sans l'éditeur visuel, vous pouvez spécifier la colonne à suivre à l'aide de signets. Vous pouvez aussi spécifier plusieurs colonnes. Des écarts dans la séquence de valeurs sont autorisés lorsque vous spécifiez des clés de signet définies par l'utilisateur.
Avertissement
Si des clés de signets définies par l'utilisateur sont utilisées, elles doivent toutes croître ou diminuer de façon monotone. Lorsque vous sélectionnez des champs supplémentaires pour une clé composée, les champs correspondants à des concepts tels que les « versions mineures » ou « numéros de révision » ne répondent pas à ce critère, car leurs valeurs sont réutilisées dans l'ensemble de votre jeu de données.
Vous pouvez spécifier jobBookmarkKeys
et jobBookmarkKeysSortOrder
de la manière suivante :
-
create_dynamic_frame.from_catalog
– Utiliseradditional_options
. -
create_dynamic_frame.from_options
– Utiliserconnection_options
.
Contexte de transformation
De nombreuses méthodes de trame AWS Glue PySpark dynamique incluent un paramètre facultatif nommétransformation_ctx
, qui est un identifiant unique pour l'instance d'opérateur ETL. Le paramètre transformation_ctx
permet d'identifier les informations d'état dans un marque-page de tâche pour l'opérateur donné. Plus précisément, AWS Glue utilise transformation_ctx
pour indexer la clé sur l'état du marque-page.
Avertissement
Le paramètre transformation_ctx
sert de clé pour rechercher dans l'état du marque-page une source spécifique dans votre script. Pour que le marque-page fonctionne correctement, vous devez toujours conserver la source et la cohérence du paramètre transformation_ctx
associée. La modification de la propriété source ou le renommage du paramètre transformation_ctx
peut rendre le marque-page précédent invalide et le filtrage basé sur l'horodatage peut ne pas donner le bon résultat.
Pour que les marque-pages de tâche fonctionnent correctement, activez le paramètre de signet de tâche et définissez le paramètre transformation_ctx
. Si vous ne transmettez pas le paramètre transformation_ctx
, les marque-pages de tâche ne sont pas activés pour une trame dynamique ou un tableau utilisé dans la méthode. Par exemple, si vous avez une tâche ETL qui lit et joint deux sources Amazon S3, vous pouvez choisir de transmettre le paramètre transformation_ctx
uniquement aux méthodes qui doivent activer les marque-pages. Si vous réinitialisez le marque-page de tâche d'une tâche, toutes les transformations associées à cette tâche sont réinitialisées, quel que soit le paramètre transformation_ctx
utilisé.
Pour plus d'informations sur la classe DynamicFrameReader
, consultez DynamicFrameReader classe. Pour plus d'informations sur les PySpark extensions, consultezAWS Référence des PySpark extensions Glue.
Exemples
Voici un exemple de script généré pour une source de données Amazon S3. Les parties du script nécessaires à l'utilisation des signets de tâche sont indiquées en italique. Pour plus d'informations sur ces éléments, consultez l'API GlueContext classe et l'API Classe DynamicFrameWriter.
# Sample Script import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) datasource0 = glueContext.create_dynamic_frame.from_catalog( database = "database", table_name = "relatedqueries_csv", transformation_ctx = "datasource0" ) applymapping1 = ApplyMapping.apply( frame = datasource0, mappings = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")], transformation_ctx = "applymapping1" ) datasink2 = glueContext.write_dynamic_frame.from_options( frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://input_path"}, format = "json", transformation_ctx = "datasink2" ) job.commit()
Voici un exemple de script généré pour une source JDBC. La table source est une table d'employés avec la colonne empno
comme clé principale. Bien que par défaut la tâche utilise une clé principale séquentielle comme clé de marque-page si aucune clé de marque-page n'est spécifiée, car empno
n'est pas nécessairement séquentiel, il peut y avoir des lacunes dans les valeurs, elle n'est pas considérée comme une clé de marque-page par défaut. Par conséquent, le script désigne explicitement empno
comme clé de marque-page. Cette partie du code est affichée en italique.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) datasource0 = glueContext.create_dynamic_frame.from_catalog( database = "hr", table_name = "emp", transformation_ctx = "datasource0", additional_options = {"jobBookmarkKeys":["empno"],"jobBookmarkKeysSortOrder":"asc"} ) applymapping1 = ApplyMapping.apply( frame = datasource0, mappings = [("ename", "string", "ename", "string"), ("hrly_rate", "decimal(38,0)", "hrly_rate", "decimal(38,0)"), ("comm", "decimal(7,2)", "comm", "decimal(7,2)"), ("hiredate", "timestamp", "hiredate", "timestamp"), ("empno", "decimal(5,0)", "empno", "decimal(5,0)"), ("mgr", "decimal(5,0)", "mgr", "decimal(5,0)"), ("photo", "string", "photo", "string"), ("job", "string", "job", "string"), ("deptno", "decimal(3,0)", "deptno", "decimal(3,0)"), ("ssn", "decimal(9,0)", "ssn", "decimal(9,0)"), ("sal", "decimal(7,2)", "sal", "decimal(7,2)")], transformation_ctx = "applymapping1" ) datasink2 = glueContext.write_dynamic_frame.from_options( frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://hr/employees"}, format = "csv", transformation_ctx = "datasink2" ) job.commit()