

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Requisiti per il protocollo di commit EMRFS S3-optimized
<a name="emr-spark-commit-protocol-reqs"></a>

Il protocollo di S3-optimized commit EMRFS viene utilizzato quando sono soddisfatte le seguenti condizioni:
+ Si eseguono job Spark che utilizzano Spark o Datasets per sovrascrivere DataFrames le tabelle partizionate.
+ Vengono eseguiti processi Spark la cui modalità di sovrascrittura delle partizioni è `dynamic`.
+ I caricamenti in più parti sono abilitati in Amazon EMR. Questa è l’impostazione predefinita. Per ulteriori informazioni, consulta [Il protocollo di commit EMRFS e i caricamenti in S3-optimized più parti](emr-spark-commit-protocol-multipart.md). 
+ La cache del file system per EMRFS è abilitata. Questa è l’impostazione predefinita. Verifica che l'impostazione `fs.s3.impl.disable.cache` sia impostata su `false`. 
+ Viene utilizzato il supporto integrato per le sorgenti dati di Spark. Built-in il supporto delle fonti di dati viene utilizzato nelle seguenti circostanze:
  + Quando i processi scrivono su origini dei dati o tabelle integrate.
  + Quando i processi scrivono tabelle Parquet del metastore Hive. Ciò accade quando `spark.sql.hive.convertInsertingPartitionedTable` e `spark.sql.hive.convertMetastoreParquet` sono entrambi impostati su true. Queste sono le impostazioni predefinite.
  + Quando i processi scrivono tabelle ORC del metastore Hive. Ciò accade quando `spark.sql.hive.convertInsertingPartitionedTable` e `spark.sql.hive.convertMetastoreOrc` sono entrambi impostati su `true`. Queste sono le impostazioni predefinite.
+ Le operazioni di processo Spark che scrivono in una posizione di partizione predefinita, ad esempio `${table_location}/k1=v1/k2=v2/`, usano il protocollo di commit. Il protocollo non viene utilizzato se un'operazione di processo scrive in una posizione di partizione personalizzata, ad esempio se un percorso di partizione personalizzato è impostato utilizzando il comando `ALTER TABLE SQL`.
+ Devono essere utilizzati i seguenti valori per Spark:
  + `spark.sql.sources.commitProtocolClass` deve essere impostato su `org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol`. Questa è l'impostazione predefinita per Amazon EMR rilascio 5.30.0 e successivi, e rilascio 6.2.0 e successivi. 
  + L'opzione di scrittura `partitionOverwriteMode` o `spark.sql.sources.partitionOverwriteMode` deve essere impostata su `dynamic`. L'impostazione predefinita è `static`.
**Nota**  
L'opzione di scrittura `partitionOverwriteMode` è stata introdotta in Spark 2.4.0. Per Spark versione 2.3.2, incluso con Amazon EMR rilascio 5.19.0, imposta la proprietà `spark.sql.sources.partitionOverwriteMode`. 
  + Se i processi Spark sovrascrivono la tabella Parquet del metastore Hive, `spark.sql.hive.convertMetastoreParquet`, `spark.sql.hive.convertInsertingPartitionedTable` e `spark.sql.hive.convertMetastore.partitionOverwriteMode` devono essere impostati su `true`. Queste sono le impostazioni predefinite. 
  + Se i processi Spark sovrascrivono la tabella ORC del metastore Hive, `spark.sql.hive.convertMetastoreOrc`, `spark.sql.hive.convertInsertingPartitionedTable` e `spark.sql.hive.convertMetastore.partitionOverwriteMode` devono essere impostati su `true`. Queste sono le impostazioni predefinite.

**Example - Modalità di sovrascrittura dinamica delle partizioni**  
In questo esempio di Scala, viene attivata l'ottimizzazione. Innanzitutto, imposta la proprietà `partitionOverwriteMode` su `dynamic`. Questo sovrascrive solo le partizioni in cui stai scrivendo i dati. Quindi, specifichi le colonne delle partizioni dinamiche con `partitionBy` e imposti la modalità scrittura su `overwrite`.  

```
val dataset = spark.range(0, 10)
  .withColumn("dt", expr("date_sub(current_date(), id)"))

dataset.write.mode("overwrite")                 // "overwrite" instead of "insert"
  .option("partitionOverwriteMode", "dynamic")  // "dynamic" instead of "static"  
  .partitionBy("dt")                            // partitioned data instead of unpartitioned data
  .parquet("s3://amzn-s3-demo-bucket1/output")    // "s3://" to use Amazon EMR file system, instead of "s3a://" or "hdfs://"
```

## Quando non viene utilizzato il protocollo di S3-optimized commit EMRFS
<a name="emr-spark-commit-protocol-reqs-anti"></a>

In genere, il protocollo di S3-optimized commit EMRFS funziona allo stesso modo del protocollo di commit Spark predefinito open source,. `org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol` L'ottimizzazione non si verifica nelle seguenti situazioni.


****  

| Situazione | Perché il protocollo di commit non viene utilizzato | 
| --- | --- | 
| Quando si scrive su HDFS | Il protocollo di commit supporta solo la scrittura su Amazon S3 utilizzando EMRFS. | 
| Quando si utilizza il file system S3A | Il protocollo di commit supporta solo EMRFS. | 
| Quando utilizzi l'API RDD MapReduce di Spark | Il protocollo di commit supporta solo l'utilizzo di API SparkSQL o Dataset. DataFrame | 
| Quando la sovrascrittura dinamica delle partizioni non viene attivata | Il protocollo di commit ottimizza solo i casi di sovrascrittura dinamica delle partizioni. Per altri casi, consulta la sezione [Utilizzate il committer EMRFS S3-optimized](emr-spark-s3-optimized-committer.md). | 

I seguenti esempi di Scala mostrano alcune situazioni aggiuntive a cui delega il protocollo di commit EMRFS S3-optimized . `SQLHadoopMapReduceCommitProtocol`

**Example - Modalità di sovrascrittura delle partizioni con posizione della partizione personalizzata**  
In questo esempio, i programmi Scala sovrascrivono due partizioni in modalità di sovrascrittura dinamica delle partizioni. Una partizione ha una posizione di partizione personalizzata. L'altra partizione usa il percorso di partizione predefinito. Il protocollo di S3-optimized commit EMRFS migliora solo la partizione che utilizza la posizione di partizione predefinita.  

```
val table = "dataset"
val inputView = "tempView"
val location = "s3://bucket/table"
                            
spark.sql(s"""
  CREATE TABLE $table (id bigint, dt date) 
  USING PARQUET PARTITIONED BY (dt) 
  LOCATION '$location'
""")

// Add a partition using a custom location
val customPartitionLocation = "s3://bucket/custom"
spark.sql(s"""
  ALTER TABLE $table ADD PARTITION (dt='2019-01-28') 
  LOCATION '$customPartitionLocation'
""")

// Add another partition using default location
spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')")

def asDate(text: String) = lit(text).cast("date")   
                       
spark.range(0, 10)
  .withColumn("dt",
    when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29")))
  .createTempView(inputView)
  
// Set partition overwrite mode to 'dynamic'
spark.sql(s"SET spark.sql.sources.partitionOverwriteMode=dynamic")
  
spark.sql(s"INSERT OVERWRITE TABLE $table SELECT * FROM $inputView")
```
Il codice Scala crea i seguenti oggetti Amazon S3:  

```
custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
custom_$folder$
table/_SUCCESS
table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
table/dt=2019-01-29_$folder$
table_$folder$
```
La scrittura in posizioni di partizione personalizzate nelle versioni precedenti di Spark può causare la perdita di dati. In questo esempio, la partizione `dt='2019-01-28'` andrebbe persa. Per ulteriori dettagli, consultare [SPARK-35106](https://issues.apache.org/jira/browse/SPARK-35106). Questo problema è stato risolto in Amazon EMR rilascio 5.33.0 e successivi, esclusi i rilasci 6.0.x e 6.1.x.

Quando si scrive nelle partizioni in posizioni personalizzate, Spark utilizza un algoritmo di commit simile all'esempio precedente, che è descritto di seguito. Come nell'esempio precedente, l'algoritmo produce ridenominazioni sequenziali che possono influire negativamente sulle prestazioni.

L'algoritmo in Spark 2.4.0 segue questi passaggi:

1. Quando si scrive l'output in una partizione in una posizione personalizzata, le attività scrivono in un file nella directory di gestione temporanea di Spark, che viene creata nella posizione di output finale. Il nome del file include un UUID casuale per proteggere il file de collisioni. Il tentativo di attività tiene traccia di ogni file insieme al percorso di output finale desiderato.

1. Quando un'attività viene completata correttamente, fornisce al driver i file e i relativi percorsi di output finali desiderati.

1. Al termine di tutte le attività, la fase di commit dei lavori consente di rinominare in sequenza tutti i file scritti per le partizioni in percorsi personalizzati nei relativi percorsi di output finali.

1. La directory di gestione temporanea viene eliminata prima del completamento della fase di commit del processo.