Types de connexion et options ETL pour AWS Glue pour Spark - AWS Glue

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Types de connexion et options ETL pour AWS Glue pour Spark

Entrée AWS Glue pour Spark, various PySpark et les méthodes et transformations Scala spécifient le type de connexion à l'aide d'un connectionType paramètre. Ils spécifient des options de connexion à l'aide d'un paramètre connectionOptions ou options.

Le paramètre connectionType peut prendre les valeurs indiquées dans le tableau suivant. Les valeurs de paramètre associées connectionOptions (ou options) pour chaque type sont documentées dans les sections suivantes. Sauf indication contraire, les paramètres s'appliquent lorsque la connexion est utilisée comme source ou comme collecteur.

Pour obtenir un exemple de code qui illustre le paramétrage et l’utilisation des options de connexion, consultez la page d’accueil de chaque type de connexion.

connectionType Se connecte à
dynamodb Base de données Amazon DynamoDB
kinesis Amazon Kinesis Data Streams
s3 Amazon S3
documentdb Base de données Amazon DocumentDB (compatible avec MongoDB)
openSearch Amazon OpenSearch Service.
redshift Base de données Amazon Redshift
kafka Kafka ou Amazon Managed Streaming for Apache Kafka
azurecosmos Azure Cosmos pour NoSQL.
azuresql AzureSQL.
bigquery Google BigQuery.
mongodb Base de données MongoDB, y compris MongoDB Atlas.
sqlserver Base SQL de données Microsoft Server (voirJDBCConnexions )
mysql Ma SQL base de données (voirJDBCConnexions )
oracle Base de données Oracle (consultez JDBCConnexions )
postgresql Base de SQL données Postgre (voirJDBCConnexions )
saphana SAP HANA.
snowflake Lac de données Snowflake
teradata Teradata Vantage.
vertica Vertica.
personnalisé.* Spark, Athena ou les magasins de JDBC données (voir Personnalisation et AWS Marketplace connectionType valeurs
marketplace.* Spark, Athena ou les magasins de JDBC données (voir) Personnalisation et AWS Marketplace connectionType valeurs

DataFrame options pour ETL AWS Glue 5.0 pour Spark

A DataFrame est un ensemble de données organisé en colonnes nommées similaires à une table et prenant en charge les opérations de style fonctionnel (map/reduce/filter/etc.) et les SQL opérations (sélection, projet, agrégation).

Pour créer un DataFrame pour une source de données prise en charge par Glue, les éléments suivants sont requis :

  • connecteur de source de données ClassName

  • connexion à une source de données Options

De même, pour écrire un dans DataFrame un récepteur de données supporté par Glue, il en va de même :

  • connecteur de récepteur de données ClassName

  • connexion au récepteur de données Options

Notez que les fonctionnalités de AWS Glue telles que les signets de tâches et les DynamicFrame options telles que celles-ci ne connectionName sont pas prises en charge dans DataFrame. Pour plus de détails sur les opérations prises en charge DataFrame et pour en savoir plus, consultez la documentation de Spark pour DataFrame.

Spécification du connecteur ClassName

Pour spécifier une source/un récepteur ClassName de données, utilisez l'.formatoption permettant de fournir le connecteur correspondant ClassName qui définit la source/le récepteur de données.

Connecteurs JDBC

Pour les JDBC connecteurs, spécifiez jdbc la valeur de l'.formatoption et indiquez le JDBC pilote ClassName dans l'driveroption.

df = spark.read.format("jdbc").option("driver", "<DATA SOURCE JDBC DRIVER CLASSNAME>")... df.write.format("jdbc").option("driver", "<DATA SINK JDBC DRIVER CLASSNAME>")...

Le tableau suivant répertorie le JDBC pilote ClassName de la source de données prise en charge dans AWS Glue for DataFrames.

Source de données Chauffeur ClassName
Poster SQL pilote org.postgreSQL
Oracle oracle.jdbc.driver. OracleDriver
SQLServer com.microsoft.sqlserver.jdbc. SQLServerDriver
Mon SQL com.mysql.jdbc.driver
SAPHana com.sap.db.jdbc.driver
Teradata com.teradata.jdbc. TeraDriver
Connecteurs Spark

Pour les connecteurs Spark, spécifiez le ClassName connecteur comme valeur de l'.formatoption.

df = spark.read.format("<DATA SOURCE CONNECTOR CLASSNAME>")... df.write.format("<DATA SINK CONNECTOR CLASSNAME>")...

Le tableau suivant répertorie le connecteur Spark ClassName de la source de données prise en charge dans AWS Glue for DataFrames.

Source de données ClassName
MongoDB/DocumentDB glue.spark.mongodb
Redshift io.github.spark_redshift_community.spark.redshift
AzureCosmos cosmos.oltp
Azure SQL com.microsoft.sqlserver.jdbc.spark
BigQuery com.google.cloud.spark.bigquery
OpenSearch org.opensearch.spark.sql
Snowflake net.snowflake.spark.snowflake
Vertica com.vertica.spark.datasource. VerticaSource

Spécification des options de connexion

Pour spécifier Options la connexion à une source/un récepteur de données, utilisez le .option(<KEY>, <VALUE>) pour fournir des options individuelles ou .options(<MAP>) pour fournir plusieurs options sous forme de carte clé-valeur.

Chaque source/récepteur de données prend en charge son propre ensemble de connexions. Options Pour plus de détails sur le connecteur Spark disponibleOptions, reportez-vous à la documentation publique du connecteur Spark spécifique à la source/au récepteur de données répertorié dans le tableau suivant.

Exemples

Les exemples suivants sont extraits de Postgre SQL et écrits dans SnowFlake :

Python

Exemple :

from awsglue.context import GlueContext from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() dataSourceClassName = "jdbc" dataSourceOptions = { "driver": "org.postgresql.Driver", "url": "<url>", "user": "<user>", "password": "<password>", "dbtable": "<dbtable>", } dataframe = spark.read.format(className).options(**options).load() dataSinkClassName = "net.snowflake.spark.snowflake" dataSinkOptions = { "sfUrl": "<url>", "sfUsername": "<username>", "sfPassword": "<password>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>" } dataframe.write.format(dataSinkClassName).options(**dataSinkOptions).save()
Scala

Exemple :

import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().getOrCreate() val dataSourceClassName = "jdbc" val dataSourceOptions = Map( "driver" -> "org.postgresql.Driver", "url" -> "<url>", "user" -> "<user>", "password" -> "<password>", "dbtable" -> "<dbtable>" ) val dataframe = spark.read.format(dataSourceClassName).options(dataSourceOptions).load() val dataSinkClassName = "net.snowflake.spark.snowflake" val dataSinkOptions = Map( "sfUrl" -> "<url>", "sfUsername" -> "<username>", "sfPassword" -> "<password>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>" ) dataframe.write.format(dataSinkClassName).options(dataSinkOptions).save()

Personnalisation et AWS Marketplace connectionType valeurs

Tel est le cas des éléments suivants :

  • "connectionType": "marketplace.athena" : désigne une connexion à un magasin de données Amazon Athena. La connexion utilise un connecteur provenant de AWS Marketplace.

  • "connectionType": "marketplace.spark" : désigne une connexion à un magasin de données Apache Spark. La connexion utilise un connecteur provenant de AWS Marketplace.

  • "connectionType": "marketplace.jdbc": désigne une connexion à un magasin de JDBC données. La connexion utilise un connecteur provenant de AWS Marketplace.

  • "connectionType": "custom.athena" : désigne une connexion à un magasin de données Amazon Athena. La connexion utilise un connecteur personnalisé que vous téléchargez vers AWS Glue Studio.

  • "connectionType": "custom.spark" : désigne une connexion à un magasin de données Apache Spark. La connexion utilise un connecteur personnalisé que vous téléchargez vers AWS Glue Studio.

  • "connectionType": "custom.jdbc": désigne une connexion à un magasin de JDBC données. La connexion utilise un connecteur personnalisé que vous téléchargez vers AWS Glue Studio.

Options de connexion pour le type custom.jdbc ou marketplace.jdbc

  • className — chaîne, obligatoire, nom de la classe du pilote.

  • connectionName — chaîne, obligatoire, nom de la connexion associée au connecteur.

  • url— Chaîne, obligatoire, JDBC URL avec des espaces réservés (${}) utilisés pour établir la connexion à la source de données. L'espace réservé ${secretKey} est remplacé par le secret du même nom dans AWS Secrets Manager. Reportez-vous à la documentation du magasin de données pour plus d'informations sur la création duURL.

  • secretIdou user/password — Chaîne, obligatoire, utilisée pour récupérer les informations d'identification duURL.

  • dbTableou query — Chaîne, obligatoire, la table ou la SQL requête à partir de laquelle les données doivent être obtenues. Vous pouvez préciser dbTable ou query, mais pas les deux.

  • partitionColumn — chaîne, facultatif, nom d'une colonne entière utilisée pour le partitionnement. Cette option fonctionne uniquement lorsqu'elle est incluse dans lowerBound, upperBound et numPartitions. Cette option fonctionne de la même manière que dans le SQL JDBC lecteur Spark. Pour plus d'informations, consultez Vers JDBCd'autres bases de données dans le guide Apache Spark SQL DataFrames et Datasets.

    Les valeurs lowerBound et upperBound sont utilisées pour décider de la progression de la partition, pas pour filtrer les lignes de la table. Toutes les lignes de la table sont partitionnées et renvoyées.

    Note

    Lorsque vous utilisez une requête au lieu d'un nom de table, vous devez valider que la requête fonctionne avec la condition de partitionnement spécifiée. Par exemple :

    • Si le format de votre requête est "SELECT col1 FROM table1", testez la requête en ajoutant une clause WHERE à la fin de la requête qui utilise la colonne de partition.

    • Si le format de votre requête est SELECT col1 FROM table1 WHERE col2=val", testez la requête en étendant la clause WHERE avec AND et une expression qui utilise la colonne de partition.

  • lowerBound — entier, facultatif, valeur minimale de partitionColumn qui est utilisée pour décider de la progression de la partition.

  • upperBound — entier, facultatif, valeur maximale de partitionColumn qui est utilisée pour décider de la progression de la partition.

  • numPartitions — entier, facultatif, nombre de partitions. Cette valeur, ainsi que lowerBound (inclusive) et upperBound (exclusive) forment les progressions de partition pour les expressions de clause WHERE générées qui sont utilisées pour diviser le fichier partitionColumn.

    Important

    Soyez prudent avec le nombre de partitions, car avoir un trop grand nombre de partitions peut causer des problèmes sur vos systèmes de bases de données externes.

  • filterPredicate — chaîne, facultative, clause de condition supplémentaire pour filtrer les données à partir de la source. Par exemple :

    BillingCity='Mountain View'

    Lorsque vous utilisez une requête au lieu d'un nom de table, vous devez vérifier que la requête fonctionne avec le filterPredicate spécifié. Par exemple :

    • Si le format de votre requête est "SELECT col1 FROM table1", testez la requête en ajoutant une clause WHERE à la fin de la requête qui utilise le prédicat de filtre.

    • Si le format de votre requête est "SELECT col1 FROM table1 WHERE col2=val", testez la requête en étendant la clause WHERE avec AND et une expression qui utilise le prédicat de filtre.

  • dataTypeMapping— Cartographie de type de données personnalisée optionnelle par dictionnaire qui crée un mappage entre un type de JDBCdonnées et un type de données Glue. Par exemple, l'option "dataTypeMapping":{"FLOAT":"STRING"} mappe les champs de JDBC données de String type Java FLOAT en appelant la ResultSet.getString() méthode du pilote et l'utilise pour créer AWS Glue records. L'objet est ResultSet implémenté par chaque pilote, donc le comportement est spécifique au pilote que vous utilisez. Reportez-vous à la documentation destinée à votre JDBC chauffeur pour comprendre comment il effectue les conversions.

  • Le AWS Glue les types de données actuellement pris en charge sont les suivants :

    • DATE

    • STRING

    • TIMESTAMP

    • INT

    • FLOAT

    • LONG

    • BIGDECIMAL

    • BYTE

    • SHORT

    • DOUBLE

    Les types de JDBC données pris en charge sont Java8 java.sql.types.

    Les mappages de types de données par défaut (de à JDBC AWS Glue) sont :

    • DATE -> DATE

    • VARCHAR -> STRING

    • CHAR -> STRING

    • LONGNVARCHAR -> STRING

    • TIMESTAMP -> TIMESTAMP

    • INTEGER -> INT

    • FLOAT -> FLOAT

    • REAL -> FLOAT

    • BIT -> BOOLEAN

    • BOOLEAN -> BOOLEAN

    • BIGINT -> LONG

    • DECIMAL -> BIGDECIMAL

    • NUMERIC -> BIGDECIMAL

    • TINYINT -> SHORT

    • SMALLINT -> SHORT

    • DOUBLE -> DOUBLE

    Si vous utilisez un mappage de type de données personnalisé avec l'option dataTypeMapping, vous pouvez remplacer un mappage de type de données par défaut. Seuls les types de JDBC données répertoriés dans l'dataTypeMappingoption sont concernés ; le mappage par défaut est utilisé pour tous les autres types de JDBC données. Vous pouvez ajouter des mappages pour des types de JDBC données supplémentaires si nécessaire. Si un type de JDBC données n'est inclus ni dans le mappage par défaut ni dans un mappage personnalisé, le type de données est converti en AWS Glue STRINGtype de données par défaut.

L'exemple de code Python suivant montre comment lire à partir de JDBC bases de données à l'aide de AWS Marketplace JDBC pilotes. Il montre la lecture à partir d'une base de données et l'écriture dans un emplacement S3.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

Options de connexion pour le type custom.athena ou marketplace.athena

  • className — chaîne, obligatoire, nom de la classe du pilote. Lorsque vous utilisez le CloudWatch connecteur Athena, cette valeur de paramètre est le préfixe de la classe Name (par exemple,). "com.amazonaws.athena.connectors" Le CloudWatch connecteur Athena est composé de deux classes : un gestionnaire de métadonnées et un gestionnaire d'enregistrements. Si vous fournissez le préfixe commun ici, les classes correctes API sont chargées en fonction de ce préfixe.

  • tableName— Chaîne, obligatoire, nom du flux de CloudWatch journal à lire. Cet extrait de code utilise le nom de vue spécial all_log_streams, ce qui signifie que la trame de données dynamique renvoyée contiendra les données de tous les flux de journaux du groupe de journaux.

  • schemaName— Chaîne, obligatoire, nom du groupe de CloudWatch journaux à partir duquel la lecture doit être effectuée. Par exemple, /aws-glue/jobs/output.

  • connectionName — chaîne, obligatoire, nom de la connexion associée au connecteur.

Pour des options supplémentaires pour ce connecteur, consultez le README fichier du CloudWatch connecteur Amazon Athena sur. GitHub

L'exemple de code Python suivant montre comment lire à partir d'un magasin de données Athena à l'aide d'un connecteur AWS Marketplace . Il montre la lecture à partir d'Athena et l'écriture dans un emplacement S3.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

Options de connexion pour le type custom.spark ou marketplace.spark

  • className — chaîne, obligatoire, nom de la classe du connecteur.

  • secretId — chaîne, facultative, utilisée pour récupérer les informations d'identification pour la connexion du connecteur.

  • connectionName — chaîne, obligatoire, nom de la connexion associée au connecteur.

  • D'autres options dépendent du magasin de données. Par exemple, les options OpenSearch de configuration commencent par le préfixees, comme décrit dans la documentation d'Elasticsearch pour Apache Hadoop. Les connexions Spark à Snowflake utilisent des options telles que sfUser et sfPassword, comme décrit dans Using the Spark Connector dans le guide Connecting to Snowflake.

L'exemple de code Python suivant montre comment lire depuis un magasin de OpenSearch données à l'aide d'une marketplace.spark connexion.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://<AWS endpoint>", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<AWS endpoint>","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

Options générales

Les options de cette section sont fournies en tant que connection_options, mais ne s'appliquent pas à un connecteur en particulier.

Les paramètres suivants sont généralement utilisés lors de la configuration de signets. Ils peuvent s'appliquer à Amazon S3 ou à des JDBC flux de travail. Pour de plus amples informations, veuillez consulter Utilisation des marque-pages de tâche.

  • jobBookmarkKeys : un tableau des noms des colonnes.

  • jobBookmarkKeysSortOrder : chaîne définissant comment comparer les valeurs en fonction de l'ordre de tri. Valeurs valides : "asc", "desc".

  • useS3ListImplementation : utilisé pour gérer les performances de la mémoire lors de la liste du contenu du compartiment Amazon S3. Pour plus d'informations, consultez la section Optimisation de la gestion de la mémoire dans AWS Glue.