GlueContext classe - AWS Glue

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

GlueContext classe

Racchiude l'SparkContextoggetto Apache Spark e fornisce quindi meccanismi per interagire con la piattaforma Apache Spark.

__init__

__init__(sparkContext)
  • sparkContext: il contesto Apache Spark da usare.

Creazione

getSource

getSource(connection_type, transformation_ctx = "", **options)

Crea un oggetto DataSource che può essere utilizzato per leggere DynamicFrames da fonti esterne.

  • connection_type: il tipo di connessione da utilizzare, ad esempio Amazon Simple Storage Service (Amazon S3), Amazon Redshift e JDBC. I valori validi includono s3, mysql, postgresql, redshift, sqlserver, oracle e dynamodb.

  • transformation_ctx: il contesto di trasformazione da usare (opzionale).

  • options: una raccolta di coppie nome/valore opzionali. Per ulteriori informazioni, consulta Tipi di connessione e opzioni per ETL in AWS Glue per Spark.

Di seguito è riportato un esempio dell'utilizzo di getSource:

>>> data_source = context.getSource("file", paths=["/in/path"]) >>> data_source.setFormat("json") >>> myFrame = data_source.getFrame()

create_dynamic_frame_from_rdd

create_dynamic_frame_from_rdd(data, name, schema=None, sample_ratio=None, transformation_ctx="")

Restituisce un DynamicFrame che viene creato da un Apache Spark Resilient Distributed Dataset (RDD).

  • data: l'origine dati da usare.

  • name: il nome dei dati da usare.

  • schema: lo schema da usare (opzionale).

  • sample_ratio: il rapporto di esempio da usare (opzionale).

  • transformation_ctx: il contesto di trasformazione da usare (opzionale).

create_dynamic_frame_from_catalog

create_dynamic_frame_from_catalog(database, table_name, redshift_tmp_dir, transformation_ctx = "", push_down_predicate= "", additional_options = {}, catalog_id = None)

Restituisce un DynamicFrame creato utilizzando un database del catalogo dati e un nome della tabella. Quando si utilizza questo metodo, si forniscono format_options le proprietà della tabella AWS Glue Data Catalog specificata e altre opzioni tramite l'additional_optionsargomento.

  • Database: il database da cui leggere.

  • table_name: il nome della tabella da cui leggere.

  • redshift_tmp_dir: una directory temporanea Amazon Redshift da usare (opzionale).

  • transformation_ctx: il contesto di trasformazione da usare (opzionale).

  • push_down_predicate: filtra le partizioni senza dover elencare e leggere tutti i file nel set di dati. Per le fonti e le limitazioni supportate, consulta Ottimizzazione delle letture con pushdown in AWS Glue ETL. Per ulteriori informazioni, consulta Prefiltraggio con i predicati pushdown.

  • additional_options: una raccolta di coppie nome/valore opzionali. Le opzioni possibili includono quelle elencate in Tipi di connessione e opzioni per ETL in AWS Glue per Spark ad eccezione di endpointUrl, streamName, bootstrap.servers, security.protocol, topicName, classification e delimiter. Un'altra opzione supportata è catalogPartitionPredicate:

    catalogPartitionPredicate — È possibile passare un'espressione di catalogo per filtrare in base alle colonne di indice. Questo esegue il push down del filtro sul lato server. Per ulteriori informazioni, consulta la pagina relativa agli indici di partizionamento di AWS Glue. Tieni presente che push_down_predicate e catalogPartitionPredicate usano sintassi diverse. Il primo utilizza la sintassi standard Spark SQL e il secondo utilizza il parser JSQL.

  • catalog_id: l'ID catalogo (ID account) del catalogo dati a cui si accede. Se Nessuno, viene utilizzato l'ID account predefinito del chiamante.

create_dynamic_frame_from_options

create_dynamic_frame_from_options(connection_type, connection_options={}, format=None, format_options={}, transformation_ctx = "")

Restituisce un DynamicFrame creato con la connessione e il formato specificati.

  • connection_type: il tipo di connessione, come Amazon S3, Amazon Redshift e JDBC. I valori validi includono s3, mysql, postgresql, redshift, sqlserver, oracle e dynamodb.

  • connection_options: opzioni di connessione, come tabella di database e percorsi (facoltativo). Per un oggetto connection_type di s3, viene definito un elenco di percorsi Amazon S3.

    connection_options = {"paths": ["s3://aws-glue-target/temp"]}

    Per le connessioni JDBC, diverse proprietà devono essere definite. Il nome del database deve fare parte dell'URL. Puoi opzionalmente essere incluso nelle opzioni di connessione.

    avvertimento

    Si consiglia di non archiviare le password nello script. Valuta la possibilità boto3 di utilizzarli per recuperarli da AWS Secrets Manager o dal AWS Glue Data Catalog.

    connection_options = {"url": "jdbc-url/database", "user": "username", "password": passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}

    La proprietà dbtable è il nome della tabella JDBC. Per i archivi dati JDBC che supportano schemi all'interno di un database, specifica schema.table-name. Se non viene fornito alcuno schema, viene usato lo schema "pubblico" predefinito.

    Per ulteriori informazioni, consulta Tipi di connessione e opzioni per ETL in AWS Glue per Spark.

  • format— Una specifica di formato. Viene usata per una connessione Amazon S3 o AWS Glue che supporta più formati. Consulta Opzioni del formato dati per input e output in AWS Glue per Spark per informazioni sui formati supportati.

  • format_options: opzioni di formato per il formato specificato. Consulta Opzioni del formato dati per input e output in AWS Glue per Spark per informazioni sui formati supportati.

  • transformation_ctx: il contesto di trasformazione da usare (opzionale).

  • push_down_predicate: filtra le partizioni senza dover elencare e leggere tutti i file nel set di dati. Per le fonti e le limitazioni supportate, consulta Ottimizzazione delle letture con pushdown in AWS Glue ETL. Per ulteriori informazioni, consulta Prefiltraggio con i predicati pushdown.

create_sample_dynamic_frame_from_catalog

create_sample_dynamic_frame_from_catalog(database, table_name, num, redshift_tmp_dir, transformation_ctx = "", push_down_predicate= "", additional_options = {}, sample_options = {}, catalog_id = None)

Restituisce un DynamicFrame di esempio creato utilizzando un database del catalogo dati e un nome della tabella. La DynamicFrame contiene solo i primi num registri da un'origine dati.

  • database: il database da cui leggere.

  • table_name: il nome della tabella da cui leggere.

  • num: il numero massimo di registri nel frame dinamico di esempio restituito.

  • redshift_tmp_dir: una directory temporanea Amazon Redshift da usare (opzionale).

  • transformation_ctx: il contesto di trasformazione da usare (opzionale).

  • push_down_predicate: filtra le partizioni senza dover elencare e leggere tutti i file nel set di dati. Per ulteriori informazioni, consulta Prefiltraggio con i predicati pushdown.

  • additional_options: una raccolta di coppie nome/valore opzionali. Le opzioni possibili includono quelle elencate in Tipi di connessione e opzioni per ETL in AWS Glue per Spark ad eccezione di endpointUrl, streamName, bootstrap.servers, security.protocol, topicName, classification e delimiter.

  • sample_options: parametri per controllare il comportamento di campionamento (facoltativo). Parametri attuali disponibili per le origini Amazon S3:

    • maxSamplePartitions: il numero massimo di partizioni che il campionamento leggerà. Il valore predefinito è 10

    • maxSampleFilesPerPartition: il numero massimo di file che il campionamento leggerà in una partizione. Il valore predefinito è 10.

      Questi parametri aiutano a ridurre il tempo impiegato dall'elenco dei file. Ad esempio, supponiamo che il set di dati contenga 1000 partizioni e ogni partizione contenga 10 file. Se hai impostato maxSamplePartitions = 10 e maxSampleFilesPerPartition = 10, invece di elencare tutti i 10.000 file, il campionamento elencherà e leggerà solo le prime 10 partizioni con i primi 10 file in ognuna di esse: 10*10 = 100 file in totale.

  • catalog_id: l'ID catalogo del catalogo dati a cui si accede (l'ID account del catalogo dati). Impostato su None per default. L'impostazione predefinita di None è l'ID catalogo dell'account chiamante nel servizio.

create_sample_dynamic_frame_from_options

create_sample_dynamic_frame_from_options(connection_type, connection_options={}, num, sample_options={}, format=None, format_options={}, transformation_ctx = "")

Restituisce un DynamicFrame di esempio creato con la connessione e il formato specificati. La DynamicFrame contiene solo i primi num registri da un'origine dati.

  • connection_type: il tipo di connessione, come Amazon S3, Amazon Redshift e JDBC. I valori validi includono s3, mysql, postgresql, redshift, sqlserver, oracle e dynamodb.

  • connection_options: opzioni di connessione, come tabella di database e percorsi (facoltativo). Per ulteriori informazioni, consulta Tipi di connessione e opzioni per ETL in AWS Glue per Spark.

  • num: il numero massimo di registri nel frame dinamico di esempio restituito.

  • sample_options: parametri per controllare il comportamento di campionamento (facoltativo). Parametri attuali disponibili per le origini Amazon S3:

    • maxSamplePartitions: il numero massimo di partizioni che il campionamento leggerà. Il valore predefinito è 10

    • maxSampleFilesPerPartition: il numero massimo di file che il campionamento leggerà in una partizione. Il valore predefinito è 10.

      Questi parametri aiutano a ridurre il tempo impiegato dall'elenco dei file. Ad esempio, supponiamo che il set di dati contenga 1000 partizioni e ogni partizione contenga 10 file. Se hai impostato maxSamplePartitions = 10 e maxSampleFilesPerPartition = 10, invece di elencare tutti i 10.000 file, il campionamento elencherà e leggerà solo le prime 10 partizioni con i primi 10 file in ognuna di esse: 10*10 = 100 file in totale.

  • format— Una specifica di formato. Viene usata per una connessione Amazon S3 o AWS Glue che supporta più formati. Consulta Opzioni del formato dati per input e output in AWS Glue per Spark per informazioni sui formati supportati.

  • format_options: opzioni di formato per il formato specificato. Consulta Opzioni del formato dati per input e output in AWS Glue per Spark per informazioni sui formati supportati.

  • transformation_ctx: il contesto di trasformazione da usare (opzionale).

  • push_down_predicate: filtra le partizioni senza dover elencare e leggere tutti i file nel set di dati. Per ulteriori informazioni, consulta Prefiltraggio con i predicati pushdown.

add_ingestion_time_columns

add_ingestion_time_columns(dataFrame, timeGranularity = "")

Aggiunge colonne del tempo di importazione dati come ingest_year, ingest_month, ingest_day, ingest_hour, ingest_minute al DataFrame di input. Questa funzione viene generata automaticamente nello script generato da AWS Glue quando si specifica una tabella del catalogo dati con Amazon S3 come destinazione. Questa funzione aggiorna automaticamente la partizione con le colonne del tempo di importazione dati nella tabella di output. Ciò consente ai dati di output di venire partizionati automaticamente nel tempo di importazione dati senza necessitare di colonne di tempo di inserimento esplicite nei dati di input.

  • dataFrame: il dataFrame al quale aggiungere le colonne del tempo di importazione dati.

  • timeGranularity: la granularità delle colonne temporali. I valori validi sono "day", "hour" e "minute". Ad esempio, se "hour" viene passato alla funzione, il dataFrame originale avrà "ingest_year", "ingest_month", "ingest_day" e "ingest_hour" colonne temporali aggiunte.

Restituisce il frame di dati dopo l'aggiunta di colonne di granularità di tempo.

Esempio:

dynamic_frame = DynamicFrame.fromDF(glueContext.add_ingestion_time_columns(dataFrame, "hour"))

create_data_frame_from_catalog

create_data_frame_from_catalog(database, table_name, transformation_ctx = "", additional_options = {})

Restituisce un DataFrame creato utilizzando le informazioni da una tabella del catalogo dati.

  • database: il database del catalogo dati da cui leggere.

  • table_name: il nome della tabella de catalogo dati da cui leggere.

  • transformation_ctx: il contesto di trasformazione da usare (opzionale).

  • additional_options: una raccolta di coppie nome/valore opzionali. Le opzioni possibili includono quelle elencate in Tipi di connessione e opzioni per ETL in AWS Glue per Spark per le origini di streaming, ad esempio startingPosition, maxFetchTimeInMs, e startingOffsets.

    • useSparkDataSource— Se impostato su true, forza AWS Glue a utilizzare l'API Spark Data Source nativa per leggere la tabella. L'API Spark Data Source supporta i seguenti formati: AVRO, binario, CSV, JSON, ORC, Parquet e testo. In una tabella del catalogo dati, il formato può essere specificato utilizzando la proprietà classification. Per ulteriori informazioni sull'API Spark Data Source, consulta la documentazione ufficiale di Apache Spark.

      L'uso di create_data_frame_from_catalog con useSparkDataSource offre i seguenti vantaggi:

      • Restituisce direttamente un DataFrame e fornisce un'alternativa a create_dynamic_frame.from_catalog().toDF().

      • Supporta il controllo delle autorizzazioni AWS Lake Formation a livello di tabella per i formati nativi.

      • Supporta la lettura dei formati Data Lake senza il controllo delle autorizzazioni a AWS Lake Formation livello di tabella. Per ulteriori informazioni, consulta Utilizzo di framework data lake con processi ETL di AWS Glue.

      Quando abilitiuseSparkDataSource, puoi anche aggiungere una qualsiasi delle opzioni di Spark Data Source secondo necessità. additional_options AWS Glue trasmette queste opzioni direttamente al lettore Spark.

    • useCatalogSchema— Se impostato su true, AWS Glue applica lo schema Data Catalog al risultatoDataFrame. Altrimenti, il lettore deduce lo schema dai dati. Se abiliti l'opzione useCatalogSchema, dovrai impostare anche useSparkDataSource su true.

Limitazioni

Quando utilizzi l'opzione useSparkDataSource considera le seguenti limitazioni:

  • Quando lo usiuseSparkDataSource, AWS Glue ne crea una nuova DataFrame in una sessione Spark separata diversa dalla sessione Spark originale.

  • Il filtro DataFrame delle partizioni Spark non funziona con le seguenti funzionalità di AWS Glue.

    Per utilizzare il filtraggio delle partizioni con queste funzionalità, è possibile utilizzare il predicato pushdown AWS Glue. Per ulteriori informazioni, consulta Prefiltraggio con i predicati pushdown. Il filtraggio sulle colonne non partizionate non viene modificato.

    Lo script di esempio seguente dimostra il modo errato di eseguire il filtraggio delle partizioni con l'opzione excludeStorageClasses.

    // Incorrect partition filtering using Spark filter with excludeStorageClasses read_df = glueContext.create_data_frame.from_catalog( database=database_name, table_name=table_name, additional_options = { "useSparkDataSource": True, "excludeStorageClasses" : ["GLACIER", "DEEP_ARCHIVE"] } ) // Suppose year and month are partition keys. // Filtering on year and month won't work, the filtered_df will still // contain data with other year/month values. filtered_df = read_df.filter("year == '2017 and month == '04' and 'state == 'CA'")

    Lo script di esempio seguente dimostra il modo corretto di utilizzare un predicato pushdown in modo da eseguire il filtraggio delle partizioni con l'opzione excludeStorageClasses.

    // Correct partition filtering using the AWS Glue pushdown predicate // with excludeStorageClasses read_df = glueContext.create_data_frame.from_catalog( database=database_name, table_name=table_name, // Use AWS Glue pushdown predicate to perform partition filtering push_down_predicate = "(year=='2017' and month=='04')" additional_options = { "useSparkDataSource": True, "excludeStorageClasses" : ["GLACIER", "DEEP_ARCHIVE"] } ) // Use Spark filter only on non-partitioned columns filtered_df = read_df.filter("state == 'CA'")

Esempio: creazione di una tabella CSV utilizzando il lettore di origini dati Spark

// Read a CSV table with '\t' as separator read_df = glueContext.create_data_frame.from_catalog( database=<database_name>, table_name=<table_name>, additional_options = {"useSparkDataSource": True, "sep": '\t'} )

create_data_frame_from_options

create_data_frame_from_options(connection_type, connection_options={}, format=None, format_options={}, transformation_ctx = "")

Questa API è obsoleta. Utilizza invece le API getSource(). Restituisce un DataFrame creato con la connessione e il formato specificati. Utilizza questa funzione solo con origini di streaming AWS Glue.

  • connection_type: il tipo di connessione streaming. I valori validi includono kinesis e kafka.

  • connection_options: opzioni di connessione, che sono diverse per Kinesis e Kafka. È possibile trovare l'elenco di tutte le opzioni di connessione per ogni origine dati di streaming all'indirizzo Tipi di connessione e opzioni per ETL in AWS Glue per Spark. Di seguito vengono illustrate le differenze delle opzioni di connessione di streaming:

    • Le origini di streaming di Kinesis richiedono streamARN, startingPosition, inferSchema e classification.

    • Le origini di streaming di Kafka richiedono connectionName, topicName, startingOffsets, inferSchema e classification.

  • format— Una specifica di formato. Viene usata per una connessione Amazon S3 o AWS Glue che supporta più formati. Per ulteriori informazioni sui formati supportati, consulta Opzioni del formato dati per input e output in AWS Glue per Spark.

  • format_options: opzioni di formato per il formato specificato. Per ulteriori informazioni sulle opzioni di formato supportate, consulta Opzioni del formato dati per input e output in AWS Glue per Spark.

  • transformation_ctx: il contesto di trasformazione da usare (opzionale).

Esempio per l'origine di streaming Amazon Kinesis:

kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)

Esempio per l'origine di streaming Kafka:

kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "startingOffsets": "earliest", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)

forEachBatch

forEachBatch(frame, batch_function, options)

Applica il batch_function passato a ogni micro batch che viene letto dall'origine di streaming.

  • frame— Il DataFrame contenente il microbatch corrente.

  • batch_function: una funzione che verrà applicata per ogni micro batch.

  • options: una raccolta di coppie chiave-valore che contiene informazioni su come elaborare micro batch. Sono richieste le seguenti opzioni:

    • windowSize: la quantità di tempo da dedicare all'elaborazione di ciascun batch.

    • checkpointLocation: la posizione in cui sono archiviati i checkpoint per il processo ETL di streaming.

    • batchMaxRetries: numero massimo di tentativi per riprovare il processo se il batch ha esito negativo. Il valore predefinito è 3. Questa opzione è configurabile solo per Glue versione 2.0 e successive.

Esempio:

glueContext.forEachBatch( frame = data_frame_datasource0, batch_function = processBatch, options = { "windowSize": "100 seconds", "checkpointLocation": "s3://kafka-auth-dataplane/confluent-test/output/checkpoint/" } ) def processBatch(data_frame, batchId): if (data_frame.count() > 0): datasource0 = DynamicFrame.fromDF( glueContext.add_ingestion_time_columns(data_frame, "hour"), glueContext, "from_data_frame" ) additionalOptions_datasink1 = {"enableUpdateCatalog": True} additionalOptions_datasink1["partitionKeys"] = ["ingest_yr", "ingest_mo", "ingest_day"] datasink1 = glueContext.write_dynamic_frame.from_catalog( frame = datasource0, database = "tempdb", table_name = "kafka-auth-table-output", transformation_ctx = "datasink1", additional_options = additionalOptions_datasink1 )

Utilizzo di set di dati in Amazon S3

purge_table

purge_table(catalog_id=None, database="", table_name="", options={}, transformation_ctx="")

Elimina i file da Amazon S3 per il database e la tabella del catalogo specificati. Se tutti i file in una partizione vengono eliminati, anche la partizione viene eliminata dal catalogo.

Per poter recuperare gli oggetti eliminati, puoi abilitare la funzione di controllo delle versioni degli oggetti nel bucket Amazon S3. Quando un oggetto viene eliminato da un bucket per il quale non è abilitata la funzione Versioni multiple degli oggetti, l'oggetto non può essere recuperato. Per ulteriori informazioni su come recuperare gli oggetti eliminati in un bucket abilitato per le versioni, consulta In che modo può essere recuperato un oggetto Amazon S3 che è stato eliminato? nel Portale del sapere di Supporto AWS .

  • catalog_id: l'ID catalogo del catalogo dati a cui si accede (l'ID account del catalogo dati). Impostato su None per default. L'impostazione predefinita di None è l'ID catalogo dell'account chiamante nel servizio.

  • database: il database da usare.

  • table_name: il nome della tabella da usare.

  • options: opzioni per filtrare i file da eliminare e per la generazione di file manifesto.

    • retentionPeriod: specifica un periodo in numero di ore per la conservazione dei file. I file più recenti del periodo di conservazione vengono mantenuti. Impostato su 168 ore (7 giorni) per impostazione predefinita.

    • partitionPredicate: le partizioni che soddisfano questo predicato vengono eliminate. I file all'interno del periodo di conservazione in queste partizioni non vengono eliminati. Impostato su "": vuoto per impostazione predefinita.

    • excludeStorageClasses: i file con classe di storage nel excludeStorageClasses non vengono eliminati. L'impostazione di default è Set(): un set vuoto.

    • manifestFilePath: un percorso facoltativo per la generazione di file manifesto. Tutti i file che sono stati eliminati correttamente vengono registrati in Success.csv e quelli che non sono riusciti in Failed.csv

  • transformation_ctx: il contesto di trasformazione da usare (opzionale). Utilizzato nel percorso del file manifest.

glueContext.purge_table("database", "table", {"partitionPredicate": "(month=='march')", "retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/"})

purge_s3_path

purge_s3_path(s3_path, options={}, transformation_ctx="")

Elimina i file dal percorso Amazon S3 specificato in modo ricorsivo.

Per poter recuperare gli oggetti eliminati, puoi abilitare la funzione di controllo delle versioni degli oggetti nel bucket Amazon S3. Quando un oggetto viene eliminato da un bucket per il quale non è abilitata la funzione di controllo delle versioni degli oggetti, l'oggetto non può essere recuperato. Per ulteriori informazioni su come recuperare oggetti eliminati in un bucket con il controllo delle versioni, vedi Come posso recuperare un oggetto Amazon S3 che è stato eliminato? nel Knowledge Center. Supporto

  • s3_path: il percorso in Amazon S3 dei file da eliminare nel formato s3://<bucket>/<prefix>/

  • options: opzioni per filtrare i file da eliminare e per la generazione di file manifesto.

    • retentionPeriod: specifica un periodo in numero di ore per la conservazione dei file. I file più recenti del periodo di conservazione vengono mantenuti. Impostato su 168 ore (7 giorni) per impostazione predefinita.

    • excludeStorageClasses: i file con classe di storage nel excludeStorageClasses non vengono eliminati. L'impostazione di default è Set(): un set vuoto.

    • manifestFilePath: un percorso facoltativo per la generazione di file manifesto. Tutti i file che sono stati eliminati correttamente vengono registrati in Success.csv e quelli che non sono riusciti in Failed.csv

  • transformation_ctx: il contesto di trasformazione da usare (opzionale). Utilizzato nel percorso del file manifest.

glueContext.purge_s3_path("s3://bucket/path/", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/"})

transition_table

transition_table(database, table_name, transition_to, options={}, transformation_ctx="", catalog_id=None)

Esegue la transizione della classe di storage dei file archiviati su Amazon S3 per il database e la tabella del catalogo specificati.

Puoi eseguire la transizione tra due classi di archiviazione qualsiasi. Per le classi di archiviazione GLACIER e DEEP_ARCHIVE, puoi passare a queste classi. Tuttavia, dovresti utilizzare un S3 RESTORE per eseguire la transizione dalle classi di archiviazione GLACIER a DEEP_ARCHIVE.

Se esegui processi ETL AWS Glue che leggono file o partizioni da Amazon S3, puoi escludere alcuni tipi di classe di archiviazione Amazon S3. Per ulteriori informazioni, consulta Esclusione delle classi di archiviazione Amazon S3.

  • database: il database da usare.

  • table_name: il nome della tabella da usare.

  • transition_to: la classe di storage Amazon S3 in cui eseguire la transizione.

  • options: opzioni per filtrare i file da eliminare e per la generazione di file manifesto.

    • retentionPeriod: specifica un periodo in numero di ore per la conservazione dei file. I file più recenti del periodo di conservazione vengono mantenuti. Impostato su 168 ore (7 giorni) per impostazione predefinita.

    • partitionPredicate: le partizioni che soddisfano questo predicato vengono trasferite. I file all'interno del periodo di conservazione in queste partizioni non vengono passati. Impostato su "": vuoto per impostazione predefinita.

    • excludeStorageClasses: i file con classe di storage nel set excludeStorageClasses non vengono passati. L'impostazione di default è Set(): un set vuoto.

    • manifestFilePath: un percorso facoltativo per la generazione di file manifesto. Tutti i file che sono stati passati correttamente vengono registrati in Success.csv e quelli che non sono riusciti in Failed.csv

    • accountId: l'ID account Amazon Web Services per eseguire la trasformazione di transizione. Obbligatorio per questa trasformazione.

    • roleArn— Il AWS ruolo per eseguire la trasformazione di transizione. Obbligatorio per questa trasformazione.

  • transformation_ctx: il contesto di trasformazione da usare (opzionale). Utilizzato nel percorso del file manifest.

  • catalog_id: l'ID catalogo del catalogo dati a cui si accede (l'ID account del catalogo dati). Impostato su None per default. L'impostazione predefinita di None è l'ID catalogo dell'account chiamante nel servizio.

glueContext.transition_table("database", "table", "STANDARD_IA", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/", "accountId": "12345678901", "roleArn": "arn:aws:iam::123456789012:user/example-username"})

transition_s3_path

transition_s3_path(s3_path, transition_to, options={}, transformation_ctx="")

Esegue la transizione della classe di storage nel percorso Amazon S3 specificato in modo ricorsivo.

Puoi eseguire la transizione tra due classi di archiviazione qualsiasi. Per le classi di archiviazione GLACIER e DEEP_ARCHIVE, puoi passare a queste classi. Tuttavia, dovresti utilizzare un S3 RESTORE per eseguire la transizione dalle classi di archiviazione GLACIER a DEEP_ARCHIVE.

Se esegui processi ETL AWS Glue che leggono file o partizioni da Amazon S3, puoi escludere alcuni tipi di classe di archiviazione Amazon S3. Per ulteriori informazioni, consulta Esclusione delle classi di archiviazione Amazon S3.

  • s3_path: il percorso in Amazon S3 dei file da convertire nel formato s3://<bucket>/<prefix>/

  • transition_to: la classe di storage Amazon S3 in cui eseguire la transizione.

  • options: opzioni per filtrare i file da eliminare e per la generazione di file manifesto.

    • retentionPeriod: specifica un periodo in numero di ore per la conservazione dei file. I file più recenti del periodo di conservazione vengono mantenuti. Impostato su 168 ore (7 giorni) per impostazione predefinita.

    • partitionPredicate: le partizioni che soddisfano questo predicato vengono trasferite. I file all'interno del periodo di conservazione in queste partizioni non vengono passati. Impostato su "": vuoto per impostazione predefinita.

    • excludeStorageClasses: i file con classe di storage nel set excludeStorageClasses non vengono passati. L'impostazione di default è Set(): un set vuoto.

    • manifestFilePath: un percorso facoltativo per la generazione di file manifesto. Tutti i file che sono stati passati correttamente vengono registrati in Success.csv e quelli che non sono riusciti in Failed.csv

    • accountId: l'ID account Amazon Web Services per eseguire la trasformazione di transizione. Obbligatorio per questa trasformazione.

    • roleArn— Il AWS ruolo per eseguire la trasformazione di transizione. Obbligatorio per questa trasformazione.

  • transformation_ctx: il contesto di trasformazione da usare (opzionale). Utilizzato nel percorso del file manifest.

glueContext.transition_s3_path("s3://bucket/prefix/", "STANDARD_IA", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/", "accountId": "12345678901", "roleArn": "arn:aws:iam::123456789012:user/example-username"})

Estrazione in corso

extract_jdbc_conf

extract_jdbc_conf(connection_name, catalog_id = None)

Restituisce un dict con chiavi con le proprietà di configurazione dall'oggetto di connessione AWS Glue nel catalogo dati.

  • user: il nome utente del database.

  • password: la password del database.

  • vendor: specifica un fornitore (mysql, postgresql, oracle, sqlserver e così via).

  • enforceSSL: una stringa booleana che indica se è necessaria una connessione sicura.

  • customJDBCCert: utilizza un certificato client specifico dal percorso Amazon S3 indicato.

  • skipCustomJDBCCertValidation: una stringa booleana che indica se customJDBCCert deve essere convalidato da una CA.

  • customJDBCCertString: informazioni aggiuntive sul certificato personalizzato, specifico per il tipo di driver.

  • url (obsoleto): l'URL JDBC con solo protocollo, server e porta.

  • fullUrl: l'URL JDBC immesso al momento della creazione della connessione (disponibile in AWS Glueversione 3.0 o successive).

Esempio di recupero delle configurazioni JDBC:

jdbc_conf = glueContext.extract_jdbc_conf(connection_name="your_glue_connection_name") print(jdbc_conf) >>> {'enforceSSL': 'false', 'skipCustomJDBCCertValidation': 'false', 'url': 'jdbc:mysql://myserver:3306', 'fullUrl': 'jdbc:mysql://myserver:3306/mydb', 'customJDBCCertString': '', 'user': 'admin', 'customJDBCCert': '', 'password': '1234', 'vendor': 'mysql'}

Transazioni

start_transaction

start_transaction(read_only)

Avvia una nuova transazione. Chiama internamente l'API Lake Formation startTransaction.

  • read_only: (booleano) indica se questa transazione debba essere di sola lettura o lettura e scrittura. Le scritture effettuate utilizzando un ID transazione di sola lettura verranno rifiutate. Il commit delle transazioni di sola lettura non deve essere eseguito.

Restituisce l'ID transazione.

commit_transaction

commit_transaction(transaction_id, wait_for_commit = True)

Tenta di eseguire il commit della transazione specificata. commit_transaction può restituire prima che la transazione abbia terminato il commit. Chiama internamente l'API Lake Formation commitTransaction.

  • transaction_id : (stringa) la transazione di cui eseguire il commit.

  • wait_for_commit: (booleano) determina se il commit_transaction restituisce immediatamente. Il valore di default è true. Se false, commit_transaction effettua il polling e aspetta che sia stato eseguito il commit della transazione. Il tempo di attesa è limitato a 1 minuto utilizzando il backoff esponenziale con un massimo di 6 tentativi.

Restituisce un valore booleano per indicare se il commit sia stato eseguito o meno.

cancel_transaction

cancel_transaction(transaction_id)

Tenta di annullare la transazione specificata. Restituisce un'eccezione TransactionCommittedException se è stato precedentemente eseguito il commit della transazione. Richiama internamente l'CancelTransactionAPI Lake Formation.

  • transaction_id: (stringa) la transazione da annullare.

Scrittura

getSink

getSink(connection_type, format = None, transformation_ctx = "", **options)

Ottiene un oggetto DataSink che può essere utilizzato per scrivere DynamicFrames su fonti esterne. Verifica prima il format SparkSQL per essere certo di ricevere il sink previsto.

  • connection_type: il tipo di connessione da utilizzare, come Amazon S3, Amazon Redshift e JDBC. I valori validi includono s3mysql,postgresql,redshift,sqlserver, oraclekinesis, ekafka.

  • format: il formato SparkSQL da utilizzare (opzionale).

  • transformation_ctx: il contesto di trasformazione da usare (opzionale).

  • options: raccolta di coppie nome-valore utilizzate per specificare le opzioni di connessione. Alcuni dei valori possibili sono:

    • user e password: per l'autorizzazione

    • url: l'endpoint per il archivio dati

    • dbtable: il nome della tabella di destinazione

    • bulkSize: il grado di parallelismo per le operazioni di inserimento

Le opzioni che è possibile specificare dipendono dal tipo di connessione. Per ulteriori valori ed esempi, consulta Tipi di connessione e opzioni per ETL in AWS Glue per Spark.

Esempio:

>>> data_sink = context.getSink("s3") >>> data_sink.setFormat("json"), >>> data_sink.writeFrame(myFrame)

write_dynamic_frame_from_options

write_dynamic_frame_from_options(frame, connection_type, connection_options={}, format=None, format_options={}, transformation_ctx = "")

Legge e restituisce un DynamicFrame usando la connessione e il formato specificati.

  • frame: il DynamicFrame da scrivere.

  • connection_type: il tipo di connessione, come Amazon S3, Amazon Redshift e JDBC. I valori validi includono s3 mysqlpostgresql,redshift,sqlserver,oracle,kinesis, ekafka.

  • connection_options: opzioni di connessione, come tabella di database e percorso (opzionale). Per un connection_type di s3 è definito un percorso Amazon S3.

    connection_options = {"path": "s3://aws-glue-target/temp"}

    Per le connessioni JDBC, diverse proprietà devono essere definite. Il nome del database deve fare parte dell'URL. Puoi opzionalmente essere incluso nelle opzioni di connessione.

    avvertimento

    Si consiglia di non archiviare le password nello script. Valuta la possibilità boto3 di utilizzarli per recuperarli da AWS Secrets Manager o dal AWS Glue Data Catalog.

    connection_options = {"url": "jdbc-url/database", "user": "username", "password": passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}

    La proprietà dbtable è il nome della tabella JDBC. Per i archivi dati JDBC che supportano schemi all'interno di un database, specifica schema.table-name. Se non viene fornito alcuno schema, viene usato lo schema "pubblico" predefinito.

    Per ulteriori informazioni, consulta Tipi di connessione e opzioni per ETL in AWS Glue per Spark.

  • format— Una specifica di formato. Viene usata per una connessione Amazon S3 o AWS Glue che supporta più formati. Consulta Opzioni del formato dati per input e output in AWS Glue per Spark per informazioni sui formati supportati.

  • format_options: opzioni di formato per il formato specificato. Consulta Opzioni del formato dati per input e output in AWS Glue per Spark per informazioni sui formati supportati.

  • transformation_ctx: un contesto di trasformazione da usare (opzionale).

write_from_options

write_from_options(frame_or_dfc, connection_type, connection_options={}, format={}, format_options={}, transformation_ctx = "")

Scrive e restituisce un DynamicFrame o una DynamicFrameCollection creati con la connessione e le informazioni di formattazione specificati.

  • frame_or_dfc: il DynamicFrame o la DynamicFrameCollection per scrivere.

  • connection_type: il tipo di connessione, come Amazon S3, Amazon Redshift e JDBC. I valori validi sono s3, mysql, postgresql, redshift, sqlserver e oracle.

  • connection_options: opzioni di connessione, come tabella di database e percorso (opzionale). Per un connection_type di s3 è definito un percorso Amazon S3.

    connection_options = {"path": "s3://aws-glue-target/temp"}

    Per le connessioni JDBC, diverse proprietà devono essere definite. Il nome del database deve fare parte dell'URL. Puoi opzionalmente essere incluso nelle opzioni di connessione.

    avvertimento

    Si consiglia di non archiviare le password nello script. Valuta la possibilità boto3 di utilizzarli per recuperarli da AWS Secrets Manager o dal AWS Glue Data Catalog.

    connection_options = {"url": "jdbc-url/database", "user": "username", "password": passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}

    La proprietà dbtable è il nome della tabella JDBC. Per i archivi dati JDBC che supportano schemi all'interno di un database, specifica schema.table-name. Se non viene fornito alcuno schema, viene usato lo schema "pubblico" predefinito.

    Per ulteriori informazioni, consulta Tipi di connessione e opzioni per ETL in AWS Glue per Spark.

  • format— Una specifica di formato. Viene usata per una connessione Amazon S3 o AWS Glue che supporta più formati. Consulta Opzioni del formato dati per input e output in AWS Glue per Spark per informazioni sui formati supportati.

  • format_options: opzioni di formato per il formato specificato. Consulta Opzioni del formato dati per input e output in AWS Glue per Spark per informazioni sui formati supportati.

  • transformation_ctx: un contesto di trasformazione da usare (opzionale).

write_dynamic_frame_from_catalog

write_dynamic_frame_from_catalog(frame, database, table_name, redshift_tmp_dir, transformation_ctx = "", additional_options = {}, catalog_id = None)

Scrive e restituisce un DynamicFrame utilizzando un database del catalogo dati e una tabella.

  • frame: il DynamicFrame da scrivere.

  • Database: il database del catalogo dati che contiene la tabella.

  • table_name: il nome della tabella del catalogo dati associata alla destinazione.

  • redshift_tmp_dir: una directory temporanea Amazon Redshift da usare (opzionale).

  • transformation_ctx: il contesto di trasformazione da usare (opzionale).

  • additional_options: una raccolta di coppie nome/valore opzionali.

  • catalog_id: l'ID catalogo (ID account) del catalogo dati a cui si accede. Se Nessuno, viene utilizzato l'ID account predefinito del chiamante.

write_data_frame_from_catalog

write_data_frame_from_catalog(frame, database, table_name, redshift_tmp_dir, transformation_ctx = "", additional_options = {}, catalog_id = None)

Scrive e restituisce un DataFrame utilizzando un database del catalogo dati e una tabella. Questo metodo supporta la scrittura nei formati di data lake (Hudi, Iceberg e Delta Lake). Per ulteriori informazioni, consulta Utilizzo di framework data lake con processi ETL di AWS Glue.

  • frame: il DataFrame da scrivere.

  • Database: il database del catalogo dati che contiene la tabella.

  • table_name: il nome della tabella del catalogo dati associata alla destinazione.

  • redshift_tmp_dir: una directory temporanea Amazon Redshift da usare (opzionale).

  • transformation_ctx: il contesto di trasformazione da usare (opzionale).

  • additional_options: una raccolta di coppie nome/valore opzionali.

    • useSparkDataSink— Se impostato su true, forza AWS Glue a utilizzare l'API nativa Spark Data Sink per scrivere sulla tabella. Quando abiliti questa opzione, puoi aggiungere qualsiasi opzione Spark Data Source a seconda delle additional_options necessità. AWS Glue passa queste opzioni direttamente allo scrittore Spark.

  • catalog_id: l'ID catalogo (ID account) del catalogo dati a cui si accede. Se non specifichi un valore, verrà utilizzato l'ID account predefinito del chiamante.

Limitazioni

Quando utilizzi l'opzione useSparkDataSink considera le seguenti limitazioni:

  • L'opzione enableUpdateCatalog non è supportata quando si utilizza l'opzione useSparkDataSink.

Esempio: scrittura su una tabella Hudi utilizzando lo scrittore Spark Data Source

hudi_options = { 'useSparkDataSink': True, 'hoodie.table.name': <table_name>, 'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field': 'product_id', 'hoodie.datasource.write.table.name': <table_name>, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.precombine.field': 'updated_at', 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hoodie.upsert.shuffle.parallelism': 2, 'hoodie.insert.shuffle.parallelism': 2, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database': <database_name>, 'hoodie.datasource.hive_sync.table': <table_name>, 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.datasource.hive_sync.mode': 'hms'} glueContext.write_data_frame.from_catalog( frame = <df_product_inserts>, database = <database_name>, table_name = <table_name>, additional_options = hudi_options )

write_dynamic_frame_from_jdbc_conf

write_dynamic_frame_from_jdbc_conf(frame, catalog_connection, connection_options={}, redshift_tmp_dir = "", transformation_ctx = "", catalog_id = None)

Legge e restituisce un DynamicFrame usando le informazioni sulla connessione JDBC specificate.

  • frame: il DynamicFrame da scrivere.

  • catalog_connection: una connessione del catalogo da utilizzare.

  • connection_options: opzioni di connessione, come tabella di database e percorso (opzionale). Per ulteriori informazioni, consulta Tipi di connessione e opzioni per ETL in AWS Glue per Spark.

  • redshift_tmp_dir: una directory temporanea Amazon Redshift da usare (opzionale).

  • transformation_ctx: un contesto di trasformazione da usare (opzionale).

  • catalog_id: l'ID catalogo (ID account) del catalogo dati a cui si accede. Se Nessuno, viene utilizzato l'ID account predefinito del chiamante.

write_from_jdbc_conf

write_from_jdbc_conf(frame_or_dfc, catalog_connection, connection_options={}, redshift_tmp_dir = "", transformation_ctx = "", catalog_id = None)

Legge e restituisce un DynamicFrame o una DynamicFrameCollection usando le informazioni sulla connessione JDBC specificate.

  • frame_or_dfc: il DynamicFrame o la DynamicFrameCollection per scrivere.

  • catalog_connection: una connessione del catalogo da utilizzare.

  • connection_options: opzioni di connessione, come tabella di database e percorso (opzionale). Per ulteriori informazioni, consulta Tipi di connessione e opzioni per ETL in AWS Glue per Spark.

  • redshift_tmp_dir: una directory temporanea Amazon Redshift da usare (opzionale).

  • transformation_ctx: un contesto di trasformazione da usare (opzionale).

  • catalog_id: l'ID catalogo (ID account) del catalogo dati a cui si accede. Se Nessuno, viene utilizzato l'ID account predefinito del chiamante.