Requisiti per il committer ottimizzato per EMRFS S3 - Amazon EMR

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Requisiti per il committer ottimizzato per EMRFS S3

Il committer EMRFS ottimizzato per S3 viene utilizzato quando sono soddisfatte le seguenti condizioni:

  • Esegui job Spark che utilizzano Spark o Datasets per scrivere file su Amazon S3. DataFrames A partire da Amazon EMR 6.4.0, questo committer può essere utilizzato per tutti i formati più comuni, incluso il parquetORC, e i formati basati su testo (incluso e). CSV JSON Per le versioni precedenti ad Amazon EMR 6.4.0, è supportato solo il formato Parquet.

  • I caricamenti multiparte sono abilitati in Amazon. EMR Questa è l'impostazione predefinita. Per ulteriori informazioni, consulta Il committer EMRFS ottimizzato per S3 e i caricamenti multipart.

  • Viene utilizzato il supporto integrato per i formati di file di Spark. Il supporto integrato per i formati viene utilizzato nelle seguenti circostanze:

    • Per le tabelle metastore Hive, quando spark.sql.hive.convertMetastoreParquet è impostato su true per le tabelle Parquet o spark.sql.hive.convertMetastoreOrc è impostato su true per le tabelle Orc con Amazon EMR 6.4.0 o versioni successive. Queste sono le impostazioni predefinite.

    • Quando i processi scrivono in origini dei dati o tabelle di formato di file, ad esempio la tabella di destinazione viene creata con la clausola USING parquet.

    • Quando i processi scrivono tabelle Parquet non partizionate Hive metastore. Il supporto per Parquet integrato di Spark non supporta le tabelle Hive partizionate, che è una limitazione nota. Per ulteriori informazioni, consulta Hive metastore Parquet table conversion in Apache Spark e Datasets Guide. DataFrames

  • Le operazioni di processo Spark che scrivono in una posizione di partizione predefinita, ad esempio ${table_location}/k1=v1/k2=v2/, usano il committer. Il committer non viene utilizzato se un'operazione di processo scrive in una posizione di partizione personalizzata, ad esempio se un percorso di partizione personalizzato è impostato utilizzando il comando ALTER TABLE SQL.

  • Devono essere utilizzati i seguenti valori per Spark:

    • La proprietà spark.sql.parquet.fs.optimized.committer.optimization-enabled deve essere impostata su true. Questa è l'impostazione predefinita con Amazon EMR 5.20.0 e versioni successive. Con Amazon EMR 5.19.0, il valore predefinito è. false Per informazioni su come configurare questo valore, consulta Abilita il committer EMRFS ottimizzato per S3 per Amazon 5.19.0 EMR.

    • Se si scrive su tabelle metastore Hive non partizionate, sono supportati solo i formati di file Parquet e Orc. spark.sql.hive.convertMetastoreParquetdeve essere impostato su true se si scrive su tabelle metastore di Parquet Hive non partizionate. spark.sql.hive.convertMetastoreOrcdeve essere impostato su true se si scrive su tabelle metastore di Orc Hive non partizionate. Queste sono le impostazioni predefinite.

    • spark.sql.parquet.output.committer.class deve essere impostato su com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter. Si tratta dell'impostazione di default.

    • spark.sql.sources.commitProtocolClassdeve essere impostato su o. org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocolè l'impostazione predefinita per la versione EMR 5.30.0 e successive della serie Amazon 5.x e per la serie Amazon EMR 6.x versione 6.2.0 e successive. org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocolè l'impostazione predefinita per le EMR versioni precedenti di Amazon.

    • Se i processi Spark sovrascrivono i set di dati Parquet partizionati con colonne di partizione dinamiche, le opzioni di scrittura partitionOverwriteMode e spark.sql.sources.partitionOverwriteMode devono essere impostate su static. Si tratta dell'impostazione di default.

      Nota

      L'opzione di scrittura partitionOverwriteMode è stata introdotta in Spark 2.4.0. Per la versione 2.3.2 di Spark, inclusa nella EMR versione Amazon 5.19.0, imposta la proprietà. spark.sql.sources.partitionOverwriteMode

Occasioni in cui non viene utilizzato un committer ottimizzato EMRFS per S3

In genere, il committer EMRFS ottimizzato per S3 non viene utilizzato nelle seguenti situazioni.

Situazione Perché il committer non viene utilizzato
Quando scrivi a HDFS Il committer supporta solo la scrittura su Amazon EMRFS S3 utilizzando.
Quando si utilizza il file system S3A Il committer supporta solo. EMRFS
Quando usi MapReduce o Spark RDD API Il committer supporta solo l'uso di Spark o SQL DataFrame Dataset. APIs

I seguenti esempi di Scala mostrano alcune situazioni aggiuntive che impediscono l'utilizzo totale e parziale del committer EMRFS ottimizzato per S3 (il primo esempio) e in parte (il secondo esempio).

Esempio - Modalità di sovrascrittura dinamica delle partizioni

Il seguente esempio di Scala indica a Spark di utilizzare un algoritmo di commit diverso, che impedisce del tutto l'uso del committer ottimizzato per S3. EMRFS Il codice imposta la proprietà partitionOverwriteMode su dynamic per sovrascrivere solo le partizioni su cui si stanno scrivendo i dati. Quindi, le colonne delle partizioni dinamiche vengono specificate da partitionBy e la modalità scrittura è impostata su overwrite.

val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") .option("partitionOverwriteMode", "dynamic") .partitionBy("dt") .parquet("s3://amzn-s3-demo-bucket1/output")

È necessario configurare tutte e tre le impostazioni per evitare di utilizzare il committer ottimizzato per S3. EMRFS In questo modo, Spark esegue un algoritmo di commit diverso specificato nel protocollo di commit di Spark. Per le versioni di Amazon EMR 5.x precedenti alla 5.30.0 e per le versioni di Amazon EMR 6.x precedenti alla 6.2.0, il protocollo commit utilizza la staging directory di Spark, che è una directory temporanea creata nella posizione di output che inizia con. .spark-staging L'algoritmo rinomina in modo sequenziale le directory delle partizioni e questo può influire negativamente sulle prestazioni. Per ulteriori informazioni sulle EMR versioni di Amazon 5.30.0 e successive e 6.2.0 e successive, consulta. Usa il protocollo di commit ottimizzato per S3 EMRFS

L'algoritmo in Spark 2.4.0 segue questi passaggi:

  1. I tentativi di attività scrivono il loro output nelle directory delle partizioni sotto la directory di staging di Spark, ad esempio ${outputLocation}/spark-staging-${jobID}/k1=v1/k2=v2/.

  2. Per ogni partizione scritta, il tentativo di attività tiene traccia dei percorsi di partizione relativi, ad esempio k1=v1/k2=v2.

  3. Quando un'attività viene completata correttamente, fornisce al driver tutti i relativi percorsi di partizione che ha tracciato.

  4. Al termine di tutte le attività, la fase di commit dei lavori raccoglie tutte le directory delle partizioni che i tentativi di attività riusciti hanno scritto nella directory di gestione temporanea di Spark. Spark rinomina in sequenza ciascuna di queste directory nella sua posizione di output finale utilizzando le operazioni di ridenominazione dell'albero delle directory.

  5. La directory di gestione temporanea viene eliminata prima del completamento della fase di commit del processo.

Esempio - Posizione della partizione personalizzata

In questo esempio, il codice Scala viene inserito in due partizioni. Una partizione ha una posizione di partizione personalizzata. L'altra partizione usa il percorso di partizione predefinito. Il committer EMRFS ottimizzato per S3 viene utilizzato solo per scrivere l'output delle attività nella partizione che utilizza la posizione di partizione predefinita.

val table = "dataset" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .write.insertInto(table)

Il codice Scala crea i seguenti oggetti Amazon S3:

custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
custom_$folder$
table/_SUCCESS
table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
table/dt=2019-01-29_$folder$
table_$folder$

Quando si scrive nelle partizioni in posizioni personalizzate, Spark utilizza un algoritmo di commit simile all'esempio precedente, che è descritto di seguito. Come nell'esempio precedente, l'algoritmo produce ridenominazioni sequenziali che possono influire negativamente sulle prestazioni.

  1. Quando si scrive l'output in una partizione in una posizione personalizzata, le attività scrivono in un file nella directory di gestione temporanea di Spark, che viene creata nella posizione di output finale. Il nome del file include un codice casuale per la protezione dalle collisioni UUID di file. Il tentativo di attività tiene traccia di ogni file insieme al percorso di output finale desiderato.

  2. Quando un'attività viene completata correttamente, fornisce al driver i file e i relativi percorsi di output finali desiderati.

  3. Al termine di tutte le attività, la fase di commit dei lavori consente di rinominare in sequenza tutti i file scritti per le partizioni in percorsi personalizzati nei relativi percorsi di output finali.

  4. La directory di gestione temporanea viene eliminata prima del completamento della fase di commit del processo.