Seleziona le tue preferenze relative ai cookie

Utilizziamo cookie essenziali e strumenti simili necessari per fornire il nostro sito e i nostri servizi. Utilizziamo i cookie prestazionali per raccogliere statistiche anonime in modo da poter capire come i clienti utilizzano il nostro sito e apportare miglioramenti. I cookie essenziali non possono essere disattivati, ma puoi fare clic su \"Personalizza\" o \"Rifiuta\" per rifiutare i cookie prestazionali.

Se sei d'accordo, AWS e le terze parti approvate utilizzeranno i cookie anche per fornire utili funzionalità del sito, ricordare le tue preferenze e visualizzare contenuti pertinenti, inclusa la pubblicità pertinente. Per continuare senza accettare questi cookie, fai clic su \"Continua\" o \"Rifiuta\". Per effettuare scelte più dettagliate o saperne di più, fai clic su \"Personalizza\".

Aggiornamento dello schema e aggiunta di nuove partizioni nel Data Catalog utilizzando AWS Glue Processi ETL

Modalità Focus
Aggiornamento dello schema e aggiunta di nuove partizioni nel Data Catalog utilizzando AWS Glue Processi ETL - AWS Glue

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Il processo di estrazione, trasformazione e caricamento (ETL) potrebbe creare nuove partizioni di tabella nell'archivio dati di destinazione. Lo schema del set di dati può evolversi e divergere dal AWS Glue Schema del catalogo dati nel tempo. AWS Glue I processi ETL ora forniscono diverse funzionalità che puoi utilizzare all'interno dello script ETL per aggiornare lo schema e le partizioni nel catalogo dati. Queste caratteristiche ti consentono di vedere i risultati del processo ETL nel catalogo dati, senza dover eseguire nuovamente il crawler.

Nuove partizioni

Se desideri visualizzare le nuove partizioni in AWS Glue Data Catalog, puoi effettuare una delle seguenti operazioni:

  • Al termine del processo, esegui nuovamente il crawler e visualizza le nuove partizioni sulla console al termine del crawler.

  • Al termine del processo, visualizza immediatamente le nuove partizioni sulla console, senza dover eseguire nuovamente il crawler. Puoi abilitare questa caratteristica aggiungendo alcune righe di codice allo script ETL, come mostrato negli esempi seguenti. Il codice utilizza l'argomento enableUpdateCatalog per indicare che il catalogo dati deve essere aggiornato durante l'esecuzione del processo quando vengono create nuove partizioni.

Metodo 1

Passare enableUpdateCatalog e partitionKeys in un argomento opzioni.

Python
additionalOptions = {"enableUpdateCatalog": True} additionalOptions["partitionKeys"] = ["region", "year", "month", "day"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<target_db_name>, table_name=<target_table_name>, transformation_ctx="write_sink", additional_options=additionalOptions)
Scala
val options = JsonOptions(Map( "path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink( database = <target_db_name>, tableName = <target_table_name>, additionalOptions = options)sink.writeDynamicFrame(df)
additionalOptions = {"enableUpdateCatalog": True} additionalOptions["partitionKeys"] = ["region", "year", "month", "day"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<target_db_name>, table_name=<target_table_name>, transformation_ctx="write_sink", additional_options=additionalOptions)
Metodo 2

Passare enableUpdateCatalog e partitionKeys in getSink() e chiamare setCatalogInfo() sull'oggetto DataSink.

Python
sink = glueContext.getSink( connection_type="s3", path="<S3_output_path>", enableUpdateCatalog=True, partitionKeys=["region", "year", "month", "day"]) sink.setFormat("json") sink.setCatalogInfo(catalogDatabase=<target_db_name>, catalogTableName=<target_table_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions( Map("path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getSink("s3", options).withFormat("json") sink.setCatalogInfo(<target_db_name>, <target_table_name>) sink.writeDynamicFrame(df)
sink = glueContext.getSink( connection_type="s3", path="<S3_output_path>", enableUpdateCatalog=True, partitionKeys=["region", "year", "month", "day"]) sink.setFormat("json") sink.setCatalogInfo(catalogDatabase=<target_db_name>, catalogTableName=<target_table_name>) sink.writeFrame(last_transform)

Ora puoi creare nuove tabelle di catalogo, aggiornare le tabelle esistenti con uno schema modificato e aggiungere nuove partizioni di tabella nel Data Catalog utilizzando un AWS Glue Il processo ETL stesso, senza la necessità di rieseguire i crawler.

Aggiornamento dello schema della tabella

Se desideri sovrascrivere lo schema della tabella del catalogo dati, puoi eseguire una delle seguenti operazioni:

  • Al termine del processo, esegui nuovamente il crawler e assicurati che il crawler sia configurato per aggiornare anche la definizione della tabella. Visualizza le nuove partizioni sulla console insieme agli eventuali aggiornamenti dello schema, al termine del crawler. Per maggiori informazioni, consulta Configurazione di un crawler utilizzando l’API.

  • Al termine del processo, visualizza immediatamente lo schema modificato sulla console, senza dover eseguire nuovamente il crawler. Puoi abilitare questa caratteristica aggiungendo alcune righe di codice allo script ETL, come mostrato negli esempi seguenti. Il codice utilizza enableUpdateCatalog impostato su true, e anche updateBehavior impostato su UPDATE_IN_DATABASE, il che indica di sovrascrivere lo schema e aggiungere nuove partizioni nel catalogo dati durante l'esecuzione del processo.

Python
additionalOptions = { "enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE"} additionalOptions["partitionKeys"] = ["partition_key0", "partition_key1"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<dst_db_name>, table_name=<dst_tbl_name>, transformation_ctx="write_sink", additional_options=additionalOptions) job.commit()
Scala
val options = JsonOptions(Map( "path" -> outputPath, "partitionKeys" -> Seq("partition_0", "partition_1"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink(database = nameSpace, tableName = tableName, additionalOptions = options) sink.writeDynamicFrame(df)
additionalOptions = { "enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE"} additionalOptions["partitionKeys"] = ["partition_key0", "partition_key1"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<dst_db_name>, table_name=<dst_tbl_name>, transformation_ctx="write_sink", additional_options=additionalOptions) job.commit()

Puoi inoltre impostare il valore updateBehavior su LOG se desideri impedire che lo schema di tabella venga sovrascritto, ma se desidera comunque aggiungere le nuove partizioni. Il valore predefinito di updateBehavior è UPDATE_IN_DATABASE, quindi se non lo definisci esplicitamente, lo schema della tabella verrà sovrascritto.

Se enableUpdateCatalog non è impostato su true, indipendentemente da qualsiasi opzione selezionata per updateBehavior, il processo ETL non aggiornerà la tabella nel catalogo dati.

Creazione di nuove tabelle

Puoi inoltre utilizzare le stesse opzioni per creare una nuova tabella nel catalogo dati. Puoi specificare il database e il nome della nuova tabella utilizzando setCatalogInfo.

Python
sink = glueContext.getSink(connection_type="s3", path="s3://path/to/data", enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE", partitionKeys=["partition_key0", "partition_key1"]) sink.setFormat("<format>") sink.setCatalogInfo(catalogDatabase=<dst_db_name>, catalogTableName=<dst_tbl_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions(Map( "path" -> outputPath, "partitionKeys" -> Seq("<partition_1>", "<partition_2>"), "enableUpdateCatalog" -> true, "updateBehavior" -> "UPDATE_IN_DATABASE")) val sink = glueContext.getSink(connectionType = "s3", connectionOptions = options).withFormat("<format>") sink.setCatalogInfo(catalogDatabase = “<dst_db_name>”, catalogTableName = “<dst_tbl_name>”) sink.writeDynamicFrame(df)
sink = glueContext.getSink(connection_type="s3", path="s3://path/to/data", enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE", partitionKeys=["partition_key0", "partition_key1"]) sink.setFormat("<format>") sink.setCatalogInfo(catalogDatabase=<dst_db_name>, catalogTableName=<dst_tbl_name>) sink.writeFrame(last_transform)

Restrizioni

Prestare attenzione alle seguenti restrizioni:

  • Sono supportate solo le destinazioni di Amazon Simple Storage Service (Amazon S3).

  • La funzionalità enableUpdateCatalog non è supportata per le tabelle governate.

  • Sono supportati solo i seguenti formati: json, csv, avro, e parquet.

  • Per creare o aggiornare tabelle con la parquet classificazione, è necessario utilizzare AWS Glue scrittrice per parquet ottimizzata per DynamicFrames. È possibile farlo in uno dei modi seguenti:

    • Se stai aggiornando una tabella esistente nel catalogo con la classificazione parquet, la proprietà della tabella "useGlueParquetWriter" deve essere impostata su true prima di aggiornarla. È possibile impostare questa proprietà tramite AWS Glue APIs /SDK, tramite la console o tramite un'istruzione Athena DDL.

      Campo di modifica delle proprietà della tabella del catalogo nella console. AWS Glue

      Una volta impostata la proprietà della tabella del catalogo, puoi utilizzare il seguente frammento di codice per aggiornare la tabella del catalogo con i nuovi dati:

      glueContext.write_dynamic_frame.from_catalog( frame=frameToWrite, database="dbName", table_name="tableName", additional_options={ "enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE" } )
    • Se la tabella non esiste ancora nel catalogo, puoi utilizzare il metodo getSink() nello script con connection_type="s3" per aggiungere la tabella e le sue partizioni al catalogo, oltre a scrivere i dati su Amazon S3. Fornisci i valori appropriati di partitionKeys e compression per il tuo flusso di lavoro.

      s3sink = glueContext.getSink( path="s3://bucket/folder/", connection_type="s3", updateBehavior="UPDATE_IN_DATABASE", partitionKeys=[], compression="snappy", enableUpdateCatalog=True ) s3sink.setCatalogInfo( catalogDatabase="dbName", catalogTableName="tableName" ) s3sink.setFormat("parquet", useGlueParquetWriter=True) s3sink.writeFrame(frameToWrite)
    • Il valore di glueparquet formato è un metodo obsoleto per abilitare il AWS Glue parquet writer.

  • Quando updateBehavior è impostato su LOG, nuove partizioni verranno aggiunte solo se lo schema DynamicFrame è equivalente o contiene un sottoinsieme delle colonne definite nello schema della tabella del catalogo dati.

  • Gli aggiornamenti dello schema non sono supportati per le tabelle non partizionate (che non utilizzano l'opzione “partitionKeys”).

  • Le PartitionKeys devono essere equivalenti, e nello stesso ordine, tra il parametro passato nello script ETL e le PartitionKeys nello schema della tabella del catalogo dati.

  • Al momento questa funzionalità non supporta ancora l'aggiornamento/creazione di tabelle in cui gli schemi di aggiornamento sono nidificati (ad esempio, array all'interno di strutture).

Per ulteriori informazioni, consulta Script di programmazione Spark.

PrivacyCondizioni del sitoPreferenze cookie
© 2025, Amazon Web Services, Inc. o società affiliate. Tutti i diritti riservati.