Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Tipi e opzioni di connessione per ETL in AWS Glue per Spark
In AWS Glue per Spark, diversi metodi e trasformazioni PySpark e Scala specificano il tipo di connessione utilizzando un parametro connectionType
. Specificano le opzioni di connessione utilizzando un parametro connectionOptions
o options
.
Il parametro connectionType
può assumere i valori indicati nella tabella seguente. I valori dei parametri associati connectionOptions
(o options
) per ciascun tipo sono documentati nelle sezioni seguenti. Salvo indicazione contraria, i parametri si applicano quando la connessione viene utilizzata come sorgente o sink.
Per il codice di esempio che illustra l'impostazione e l'utilizzo delle opzioni di connessione, consulta la home page per ogni tipo di connessione.
connectionType |
Si connette a |
---|---|
dynamodb | Amazon DynamoDB database |
kinesis | Flusso di dati Amazon Kinesis |
s3 | Amazon S3 |
documentdb | Amazon DocumentDB (con compatibilità MongoDB) database |
opensearch | Servizio OpenSearch di Amazon. |
redshift | Database Amazon Redshift |
kafka | Kafka |
azurecosmos | Azure Cosmos per NoSQL. |
azuresql | Azure SQL. |
bigquery | Google BigQuery. |
mongodb | Database MongoDB |
sqlserver | Microsoft SQL Server database (vedere Connessioni JDBC) |
mysql | MySQL |
oracle | Oracle |
postgresql | PostgreSQL |
saphana | SAP HANA. |
snowflake | Data lake Snowflake |
teradata | Teradata Vantage. |
vertica | Vertica. |
personalizzato.* | Archivi dati Spark, Athena o JDBC (consulta Valori di personalizzazione e connectionType Marketplace AWS |
marketplace.* | Archivi dati Spark, Athena o JDBC (consulta Valori di personalizzazione e connectionType Marketplace AWS) |
Valori di personalizzazione e connectionType Marketplace AWS
Questi sono i seguenti:
-
"connectionType": "marketplace.athena"
: designa una connessione a un archivio dati Amazon Athena. La connessione utilizza un connettore di Marketplace AWS. -
"connectionType": "marketplace.spark"
: designa una connessione a un archivio dati Apache Spark. La connessione utilizza un connettore di Marketplace AWS. -
"connectionType": "marketplace.jdbc"
: designa una connessione a un archivio dati JDBC. La connessione utilizza un connettore di Marketplace AWS. -
"connectionType": "custom.athena"
: designa una connessione a un archivio dati Amazon Athena. La connessione utilizza un connettore personalizzato che va caricato in AWS Glue Studio. -
"connectionType": "custom.spark"
: designa una connessione a un archivio dati Apache Spark. La connessione utilizza un connettore personalizzato che va caricato in AWS Glue Studio. -
"connectionType": "custom.jdbc"
: designa una connessione a un archivio dati JDBC. La connessione utilizza un connettore personalizzato che va caricato in AWS Glue Studio.
Opzioni di connessione per il tipo custom.jdbc o marketplace.jdbc
-
className
: stringa, obbligatorio, nome della classe driver. -
connectionName
: stringa, obbligatorio, nome della connessione associata al connettore. -
url
: stringa, obbligatorio, URL JDBC con segnaposto (${}
) che vengono utilizzati per creare la connessione all'origine dati. Il segnaposto${secretKey}
viene sostituito con il segreto con lo stesso nome in AWS Secrets Manager. Per ulteriori informazioni sulla creazione dell'URL, fare riferimento alla documentazione dell'archivio dati. -
secretId
ouser/password
: stringa, obbligatorio, utilizzato per recuperare le credenziali per l'URL. -
dbTable
oquery
: stringa, obbligatorio, la tabella o la query SQL da cui ottenere i dati. Puoi specificaredbTable
oquery
, ma non entrambi. -
partitionColumn
: stringa, facoltativo, il nome di una colonna intera utilizzata per il partizionamento. Questa opzione funziona solo quando è inclusa conlowerBound
,upperBound
enumPartitions
. Questa opzione funziona allo stesso modo del lettore Spark SQL JDBC. Per ulteriori informazioni, consulta Da JDBC ad altri databasenel manuale Apache Spark SQL, DataFrames and Datasets Guide. I valori
lowerBound
eupperBound
vengono utilizzati per decidere lo stride della partizione, non per filtrare le righe nella tabella. Tutte le righe della tabella vengono partizionate e restituite.Nota
Quando si utilizza una query anziché un nome di tabella, è necessario verificare che la query funzioni con la condizione di partizionamento specificata. Ad esempio:
-
Se il formato della query è
"SELECT col1 FROM table1"
, testa la query aggiungendo una clausolaWHERE
alla fine della query che utilizza la colonna della partizione. -
Se il formato della query è "
SELECT col1 FROM table1 WHERE col2=val"
, testa la query estendendo la clausolaWHERE
conAND
e un'espressione che utilizza la colonna della partizione.
-
-
lowerBound
: intero, facoltativo, il valore minimo dipartitionColumn
che viene utilizzato per decidere lo stride della partizione. -
upperBound
: intero, facoltativo, il valore massimo dipartitionColumn
che viene utilizzato per decidere lo stride della partizione. -
numPartitions
: intero, facoltativo, il numero di partizioni. Questo valore, insieme alowerBound
(incluso) eupperBound
(escluso), forma stride di partizione per espressioni con le clausoleWHERE
generate che vengono utilizzate per dividere lapartitionColumn
.Importante
Presta attenzione al numero di partizioni perché troppe partizioni potrebbero causare problemi nei sistemi di database esterni.
-
filterPredicate
: stringa, opzionale, clausola condizione extra per filtrare i dati dall'origine. Ad esempio:BillingCity='Mountain View'
Quando si utilizza una query anziché un nome di table, è necessario verificare che la query funzioni con il
filterPredicate
specificato. Ad esempio:-
Se il formato della query è
"SELECT col1 FROM table1"
, testa la query aggiungendo una clausolaWHERE
alla fine della query che utilizza il predicato filtro. -
Se il formato della query è
"SELECT col1 FROM table1 WHERE col2=val"
, testa la query estendendo la clausolaWHERE
conAND
e un'espressione che utilizza il predicato filtro.
-
-
dataTypeMapping
: dizionario, opzionale, mappatura del tipo di dati personalizzata che crea una mappatura da un tipo di dati JDBC a un tipo di dati Glue. Ad esempio, l'opzione"dataTypeMapping":{"FLOAT":"STRING"}
mappa i campi di dati di tipo JDBCFLOAT
nel tipo JavaString
chiamando il metodoResultSet.getString()
del driver e lo usa per costruire registri di AWS Glue. L'oggettoResultSet
viene implementato da ciascun driver, quindi il comportamento è specifico del driver utilizzato. Consulta la documentazione relativa al driver JDBC per capire come il driver esegue le conversioni. -
I tipi di dati AWS Glue correntemente supportati sono:
-
DATE
-
STRING
-
TIMESTAMP
-
INT
-
FLOAT
-
LONG
-
BIGDECIMAL
-
BYTE
-
SHORT
-
DOUBLE
I tipi di dati JDBC supportati sono Java8 java.sql.types
. Le mappature di default dei tipi di dati (da JDBC a AWS Glue) sono:
-
DATE -> DATE
-
VARCHAR -> STRING
-
CHAR -> STRING
-
LONGNVARCHAR -> STRING
-
TIMESTAMP -> TIMESTAMP
-
INTEGER -> INT
-
FLOAT -> FLOAT
-
REAL -> FLOAT
-
BIT -> BOOLEAN
-
BOOLEAN -> BOOLEAN
-
BIGINT -> LONG
-
DECIMAL -> BIGDECIMAL
-
NUMERIC -> BIGDECIMAL
-
TINYINT -> SHORT
-
SMALLINT -> SHORT
-
DOUBLE -> DOUBLE
Se si utilizza un mapping del tipo di dati personalizzato con l'opzione
dataTypeMapping
, è possibile sovrascrivere una mappatura di default del tipo di dati. Sono interessati solo i tipi di dati JDBC elencati nell'opzionedataTypeMapping
; per tutti gli altri tipi di dati JDBC viene utilizzata la mappatura di default. Se necessario, è possibile aggiungere mappature per tipi di dati JDBC aggiuntivi. Se un tipo di dati JDBC non è incluso nella mappatura di default o in una mappatura personalizzata, per impostazione predefinita viene convertito nel tipo di datiSTRING
AWS Glue. -
Negli esempi di codice Python riportati di seguito viene illustrato come leggere dai database JDBC con driver JDBC Marketplace AWS. Mostra la lettura da un database e la scrittura in una posizione S3.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://
<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
Opzioni di connessione per il tipo custom.athena o marketplace.athena
-
className
– Stringa, obbligatorio, nome della classe driver. Quando si utilizza il connettore Athena-CloudWatch, questo valore di parametro è il prefisso della classe Name (ad esempio,"com.amazonaws.athena.connectors"
). Il connettore Athena-CloudWatch è composto da due classi: un gestore di metadati e un gestore di registri. Se si fornisce qui il prefisso comune, l'API carica le classi corrette in base a tale prefisso. -
tableName
: stringa, obbligatorio, il nome del flusso di log CloudWatch da leggere. In questo frammento di codice viene utilizzato il nome della vista specialeall_log_streams
, il che significa che il frame di dati dinamico restituito conterrà i dati di tutti i flussi di log nel gruppo di log. -
schemaName
: stringa, obbligatorio, il nome del gruppo di log CloudWatch da cui leggere. Ad esempio,/aws-glue/jobs/output
. -
connectionName
– Stringa, obbligatorio, nome della connessione associata al connettore.
Per ulteriori opzioni per questo connettore, consultare il README del connettore Amazon Athena CloudWatch
Il seguente esempio di codice Python mostra come leggere da un archivio dati Athena utilizzando un connettore Marketplace AWS. Mostra la lettura da Athena e la scrittura in una posizione S3.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://
<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
Opzioni di connessione per il tipo custom.spark o marketplace.spark
-
className
: stringa, obbligatorio, nome della classe del connettore. -
secretId
: stringa, facoltativo, utilizzato per recuperare le credenziali per la connessione del connettore. -
connectionName
– Stringa, obbligatorio, nome della connessione associata al connettore. -
Altre opzioni dipendono dall'archivio dati. Ad esempio, le opzioni di configurazione di OpenSearch iniziano con il prefisso
es
, come descritto nella documentazione di Elasticsearch per Apache Hadoop. Le connessioni Spark a Snowflake utilizzano opzioni come sfUser
esfPassword
, come descritto in Using the Spark Connectornella guida Connecting to Snowflake.
Il seguente esempio di codice Python mostra come leggere da un archivio dati OpenSearch utilizzando una connessione marketplace.spark
.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://
<AWS endpoint>
", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<AWS endpoint>
","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
Opzioni generali
Le opzioni in questa sezione sono fornite come connettore connection_options
, ma non si applicano specificamente a tale connettore.
I seguenti parametri vengono generalmente utilizzati per la configurazione dei segnalibri. Possono applicarsi ai flussi di lavoro Amazon S3 o JDBC. Per ulteriori informazioni, consulta Utilizzo di segnalibri di processo.
jobBookmarkKeys
: un array di nomi di colonna.jobBookmarkKeysSortOrder
: una stringa che definisce come confrontare i valori in base all'ordinamento. Valori validi:"asc"
,"desc"
.useS3ListImplementation
: utilizzato per gestire le prestazioni della memoria quando si elencano i contenuti dei bucket Amazon S3. Per ulteriori informazioni, consulta la pagina Optimize memory management in AWS Glue.