Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Anforderungen für den S3-optimierten EMRFS Committer
Der EMRFS S3-optimierte Committer wird verwendet, wenn die folgenden Bedingungen erfüllt sind:
-
Sie führen Spark-Jobs aus, die Spark oder Datasets verwenden DataFrames, um Dateien in Amazon S3 zu schreiben. Ab Amazon EMR 6.4.0 kann dieser Committer für alle gängigen Formate verwendet werdenORC, einschließlich Parkett und textbasierte Formate (einschließlich CSV und). JSON Für Versionen vor Amazon EMR 6.4.0 wird nur das Parquet-Format unterstützt.
-
Mehrteilige Uploads sind in Amazon aktiviert. EMR Dies ist die Standardeinstellung. Weitere Informationen finden Sie unter Der EMRFS S3-optimierte Committer und mehrteilige Uploads.
-
Die integrierte Dateiformatunterstützung von Spark wird verwendet. Die integrierte Dateiformatunterstützung wird unter folgenden Umständen verwendet:
-
Bei Hive-Metastore-Tabellen
spark.sql.hive.convertMetastoreParquet
ist wanntrue
für Parquet-Tabellen oderspark.sql.hive.convertMetastoreOrc
true
für Orc-Tabellen mit Amazon EMR 6.4.0 oder höher auf eingestellt. Dies sind die Standardeinstellungen. -
Wenn Aufträge in Datenquellen oder Tabellen im Dateiformat schreiben, wird beispielsweise die Zieltabelle mit der
USING parquet
Klausel erstellt. -
Wenn Aufträge in nicht partitionierte Hive-Metastore-Parquet-Tabellen schreiben. Eine bekannte Einschränkung besteht darin, dass die in Spark integrierte Parquet-Unterstützung keine partitionierten Hive-Tabellen unterstützt. Weitere Informationen finden Sie unter Konvertierung von Hive Metastore Parquet-Tabellen
im Apache Spark and Datasets Guide. DataFrames
-
-
Spark-Job-Operationen, die beispielsweise
${table_location}/k1=v1/k2=v2/
in einen Standardspeicherort für Partitionen schreiben, verwenden den Committer. Der Committer wird nicht verwendet, wenn ein Job-Vorgang in einen benutzerdefinierten Partitionsspeicherort schreibt, z. B. wenn mit demALTER TABLE SQL
-Befehl ein benutzerdefinierter Partitionsspeicherort festgelegt wurde. -
Für Spark müssen die folgenden Werte verwendet werden:
-
Der Eigenschaft
spark.sql.parquet.fs.optimized.committer.optimization-enabled
muss auftrue
eingestellt sein. Dies ist die Standardeinstellung bei Amazon EMR 5.20.0 und höher. Bei Amazon EMR 5.19.0 ist der Standardwert.false
Weitere Informationen zum Konfigurieren dieses Werts finden Sie unter Aktivieren Sie den EMRFS S3-optimierten Committer für Amazon 5.19.0 EMR. -
Beim Schreiben in nicht partitionierte Hive-Metastore-Tabellen werden nur die Dateiformate Parquet und Orc unterstützt.
spark.sql.hive.convertMetastoreParquet
muss auf gesetzt sein,true
wenn in nicht partitionierte Parquet Hive-Metastore-Tabellen geschrieben werden.spark.sql.hive.convertMetastoreOrc
muss auf gesetzt sein,true
wenn in nicht partitionierte Orc Hive Metastore-Tabellen geschrieben werden. Dies sind die Standardeinstellungen. -
muss
spark.sql.parquet.output.committer.class
aufcom.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
festgelegt sein. Dies ist die Standardeinstellung. -
spark.sql.sources.commitProtocolClass
muss auf oder gesetzt sein.org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol
org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol
ist die Standardeinstellung für die Amazon EMR 5.x-Serie Version 5.30.0 und höher und für die Amazon EMR 6.x-Serie Version 6.2.0 und höher.org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
ist die Standardeinstellung für frühere EMR Amazon-Versionen. -
Wenn Spark-Aufträge partitionierte Parquet-Datasets durch dynamische Partitionsspalten überschreiben, dann müssen die Schreiboption
partitionOverwriteMode
undspark.sql.sources.partitionOverwriteMode
aufstatic
eingestellt sein. Dies ist die Standardeinstellung.Anmerkung
Die Schreiboption
partitionOverwriteMode
wurde mit Spark 2.4.0 eingeführt. Für Spark-Version 2.3.2, die in der EMR Amazon-Version 5.19.0 enthalten ist, legen Sie die Eigenschaft fest.spark.sql.sources.partitionOverwriteMode
-
Fälle, in denen der EMRFS S3-optimierte Committer nicht verwendet wird
Im Allgemeinen wird der EMRFS S3-optimierte Committer in den folgenden Situationen nicht verwendet.
Situation | Warum der Committer nicht verwendet wird |
---|---|
Wenn Sie an schreiben HDFS | Der Committer unterstützt nur das Schreiben in Amazon S3 mithilfe vonEMRFS. |
Bei Verwendung des S3A-Dateisystems | Der Committer unterstützt nur. EMRFS |
Wenn Sie MapReduce oder Spark verwenden RDD API | Der Committer unterstützt nur die Verwendung von SparkSQL, DataFrame, oder DatasetAPIs. |
Die folgenden Scala-Beispiele zeigen einige zusätzliche Situationen, die verhindern, dass der EMRFS S3-optimierte Committer vollständig (das erste Beispiel) und teilweise (das zweite Beispiel) verwendet wird.
Beispiel – Dynamischer Partitionsüberschreibmodus
Das folgende Scala-Beispiel weist Spark an, einen anderen Commit-Algorithmus zu verwenden, wodurch die Verwendung des S3-optimierten Committers vollständig verhindert wird. EMRFS Der Code legt die Eigenschaft partitionOverwriteMode
auf dynamic
fest, sodass nur die Partitionen überschrieben werden, auf die Sie Daten schreiben. Anschließend werden dynamische Partitionsspalten durch partitionBy
angegeben und der Schreibmodus ist auf overwrite
eingestellt.
val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") .option("partitionOverwriteMode", "dynamic") .partitionBy("dt") .parquet("s3://
amzn-s3-demo-bucket1
/output")
Sie müssen alle drei Einstellungen konfigurieren, um die Verwendung des S3-optimierten Committers zu vermeiden. EMRFS Wenn Sie dies tun, führt Spark einen anderen Commit-Algorithmus aus, der im Commit-Protokoll von Spark angegeben ist. Für Amazon EMR 5.x-Versionen vor 5.30.0 und für Amazon EMR 6.x-Versionen vor 6.2.0 verwendet das Commit-Protokoll das Staging-Verzeichnis von Spark, ein temporäres Verzeichnis, das unter dem Ausgabeverzeichnis erstellt wird, das mit beginnt. .spark-staging
Der Algorithmus benennt Partitionsverzeichnisse nacheinander um, was sich negativ auf die Leistung auswirken kann. Weitere Informationen zu den EMR Amazon-Versionen 5.30.0 und höher sowie 6.2.0 und höher finden Sie unter. Verwenden Sie das S3-optimierte Commit-Protokoll EMRFS
Der Algorithmus in Spark 2.4.0 folgt diesen Schritten:
-
Aufgabenversuche schreiben ihre Ausgabe in Partitionsverzeichnisse unterhalb des Staging-Verzeichnisses von Spark, beispielsweise
${outputLocation}/spark-staging-${jobID}/k1=v1/k2=v2/
. -
Für jede geschriebene Partition verfolgt der Aufgabenversuch die relativen Partitionspfade, z. B.
k1=v1/k2=v2
. -
Wenn eine Aufgabe erfolgreich abgeschlossen wurde, stellt sie dem Treiber alle zugehörigen Partitionspfade bereit, die von ihr nachverfolgt wurden.
-
Nachdem alle Aufgaben abgeschlossen wurden, sammelt die Auftrags-Commit-Phase alle die Partitionsverzeichnisse, die von erfolgreichen Aufgabenversuchen unter dem Staging-Verzeichnis von Spark geschrieben wurden. Spark benennt jedes dieser Verzeichnisse mithilfe von Verzeichnisstruktur-Umbenennungsoperationen sequenziell bis zu ihrem endgültigen Ausgabespeicherort um.
-
Das Staging-Verzeichnis wird gelöscht, bevor der Auftrags-Commit-Phase abgeschlossen ist.
Beispiel – Benutzerdefinierter Partitionsspeicherort
In diesem Beispiel wird der Scala-Code in zwei Partitionen eingefügt. Eine Partition verfügt über einen benutzerdefinierten Partitionsspeicherort. Die andere Partition verwendet den Standard-Partitionsspeicherort. Der EMRFS S3-optimierte Committer wird nur zum Schreiben der Aufgabenausgabe auf die Partition verwendet, die den Standardspeicherort der Partition verwendet.
val table = "dataset" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .write.insertInto(table)
Der Scala-Code erstellt die folgenden Amazon-S3-Objekte:
custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet custom_$folder$ table/_SUCCESS table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet table/dt=2019-01-29_$folder$ table_$folder$
Beim Schreiben in Partitionen an benutzerdefinierten Speicherorten verwendet Spark einen Commit-Algorithmus, ähnlich wie im vorherigen Beispiel. Dies wird im Folgenden beschrieben. Genau wie bei dem vorherigen Beispiel führt der Algorithmus zu sequenziellen Umbenennungen, wodurch die Leistung beeinträchtigt werden kann.
-
Beim Schreiben von Ausgabe in eine Partition an einem benutzerdefinierten Speicherort werden Aufgaben in eine Datei unter dem Staging-Verzeichnis von Spark geschrieben, das unter dem endgültigen Ausgabespeicherort erstellt wird. Der Name der Datei enthält zum Schutz vor Dateikollisionen einen UUID zufälligen Wert. Der Aufgabe-Versuch verfolgt jede Datei zusammen mit dem gewünschten endgültigen Ausgabepfad nach.
-
Wenn eine Aufgabe erfolgreich abgeschlossen wird, stellt sie dem Treiber die Dateien und die für sie gewünschten endgültigen Ausgabepfade bereit.
-
Nachdem alle Aufgaben beendet wurden, benennt die Auftrags-Commit-Phase alle Dateien, die für Partitionen an benutzerdefinierten Speicherorten geschrieben wurden, sequentiell in ihre endgültigen Ausgabepfade um.
-
Das Staging-Verzeichnis wird gelöscht, bevor der Auftrags-Commit-Phase abgeschlossen ist.