Anforderungen für den S3-optimierten EMRFS Committer - Amazon EMR

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Anforderungen für den S3-optimierten EMRFS Committer

Der EMRFS S3-optimierte Committer wird verwendet, wenn die folgenden Bedingungen erfüllt sind:

  • Sie führen Spark-Jobs aus, die Spark oder Datasets verwenden DataFrames, um Dateien in Amazon S3 zu schreiben. Ab Amazon EMR 6.4.0 kann dieser Committer für alle gängigen Formate verwendet werdenORC, einschließlich Parkett und textbasierte Formate (einschließlich CSV und). JSON Für Versionen vor Amazon EMR 6.4.0 wird nur das Parquet-Format unterstützt.

  • Mehrteilige Uploads sind in Amazon aktiviert. EMR Dies ist die Standardeinstellung. Weitere Informationen finden Sie unter Der EMRFS S3-optimierte Committer und mehrteilige Uploads.

  • Die integrierte Dateiformatunterstützung von Spark wird verwendet. Die integrierte Dateiformatunterstützung wird unter folgenden Umständen verwendet:

    • Bei Hive-Metastore-Tabellen spark.sql.hive.convertMetastoreParquet ist wann true für Parquet-Tabellen oder spark.sql.hive.convertMetastoreOrc true für Orc-Tabellen mit Amazon EMR 6.4.0 oder höher auf eingestellt. Dies sind die Standardeinstellungen.

    • Wenn Aufträge in Datenquellen oder Tabellen im Dateiformat schreiben, wird beispielsweise die Zieltabelle mit der USING parquet Klausel erstellt.

    • Wenn Aufträge in nicht partitionierte Hive-Metastore-Parquet-Tabellen schreiben. Eine bekannte Einschränkung besteht darin, dass die in Spark integrierte Parquet-Unterstützung keine partitionierten Hive-Tabellen unterstützt. Weitere Informationen finden Sie unter Konvertierung von Hive Metastore Parquet-Tabellen im Apache Spark and Datasets Guide. DataFrames

  • Spark-Job-Operationen, die beispielsweise ${table_location}/k1=v1/k2=v2/ in einen Standardspeicherort für Partitionen schreiben, verwenden den Committer. Der Committer wird nicht verwendet, wenn ein Job-Vorgang in einen benutzerdefinierten Partitionsspeicherort schreibt, z. B. wenn mit dem ALTER TABLE SQL-Befehl ein benutzerdefinierter Partitionsspeicherort festgelegt wurde.

  • Für Spark müssen die folgenden Werte verwendet werden:

    • Der Eigenschaft spark.sql.parquet.fs.optimized.committer.optimization-enabled muss auf true eingestellt sein. Dies ist die Standardeinstellung bei Amazon EMR 5.20.0 und höher. Bei Amazon EMR 5.19.0 ist der Standardwert. false Weitere Informationen zum Konfigurieren dieses Werts finden Sie unter Aktivieren Sie den EMRFS S3-optimierten Committer für Amazon 5.19.0 EMR.

    • Beim Schreiben in nicht partitionierte Hive-Metastore-Tabellen werden nur die Dateiformate Parquet und Orc unterstützt. spark.sql.hive.convertMetastoreParquetmuss auf gesetzt sein, true wenn in nicht partitionierte Parquet Hive-Metastore-Tabellen geschrieben werden. spark.sql.hive.convertMetastoreOrcmuss auf gesetzt sein, true wenn in nicht partitionierte Orc Hive Metastore-Tabellen geschrieben werden. Dies sind die Standardeinstellungen.

    • muss spark.sql.parquet.output.committer.class auf com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter festgelegt sein. Dies ist die Standardeinstellung.

    • spark.sql.sources.commitProtocolClassmuss auf oder gesetzt sein. org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocolist die Standardeinstellung für die Amazon EMR 5.x-Serie Version 5.30.0 und höher und für die Amazon EMR 6.x-Serie Version 6.2.0 und höher. org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocolist die Standardeinstellung für frühere EMR Amazon-Versionen.

    • Wenn Spark-Aufträge partitionierte Parquet-Datasets durch dynamische Partitionsspalten überschreiben, dann müssen die Schreiboption partitionOverwriteMode und spark.sql.sources.partitionOverwriteMode auf static eingestellt sein. Dies ist die Standardeinstellung.

      Anmerkung

      Die Schreiboption partitionOverwriteMode wurde mit Spark 2.4.0 eingeführt. Für Spark-Version 2.3.2, die in der EMR Amazon-Version 5.19.0 enthalten ist, legen Sie die Eigenschaft fest. spark.sql.sources.partitionOverwriteMode

Fälle, in denen der EMRFS S3-optimierte Committer nicht verwendet wird

Im Allgemeinen wird der EMRFS S3-optimierte Committer in den folgenden Situationen nicht verwendet.

Situation Warum der Committer nicht verwendet wird
Wenn Sie an schreiben HDFS Der Committer unterstützt nur das Schreiben in Amazon S3 mithilfe vonEMRFS.
Bei Verwendung des S3A-Dateisystems Der Committer unterstützt nur. EMRFS
Wenn Sie MapReduce oder Spark verwenden RDD API Der Committer unterstützt nur die Verwendung von SparkSQL, DataFrame, oder DatasetAPIs.

Die folgenden Scala-Beispiele zeigen einige zusätzliche Situationen, die verhindern, dass der EMRFS S3-optimierte Committer vollständig (das erste Beispiel) und teilweise (das zweite Beispiel) verwendet wird.

Beispiel – Dynamischer Partitionsüberschreibmodus

Das folgende Scala-Beispiel weist Spark an, einen anderen Commit-Algorithmus zu verwenden, wodurch die Verwendung des S3-optimierten Committers vollständig verhindert wird. EMRFS Der Code legt die Eigenschaft partitionOverwriteMode auf dynamic fest, sodass nur die Partitionen überschrieben werden, auf die Sie Daten schreiben. Anschließend werden dynamische Partitionsspalten durch partitionBy angegeben und der Schreibmodus ist auf overwrite eingestellt.

val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") .option("partitionOverwriteMode", "dynamic") .partitionBy("dt") .parquet("s3://amzn-s3-demo-bucket1/output")

Sie müssen alle drei Einstellungen konfigurieren, um die Verwendung des S3-optimierten Committers zu vermeiden. EMRFS Wenn Sie dies tun, führt Spark einen anderen Commit-Algorithmus aus, der im Commit-Protokoll von Spark angegeben ist. Für Amazon EMR 5.x-Versionen vor 5.30.0 und für Amazon EMR 6.x-Versionen vor 6.2.0 verwendet das Commit-Protokoll das Staging-Verzeichnis von Spark, ein temporäres Verzeichnis, das unter dem Ausgabeverzeichnis erstellt wird, das mit beginnt. .spark-staging Der Algorithmus benennt Partitionsverzeichnisse nacheinander um, was sich negativ auf die Leistung auswirken kann. Weitere Informationen zu den EMR Amazon-Versionen 5.30.0 und höher sowie 6.2.0 und höher finden Sie unter. Verwenden Sie das S3-optimierte Commit-Protokoll EMRFS

Der Algorithmus in Spark 2.4.0 folgt diesen Schritten:

  1. Aufgabenversuche schreiben ihre Ausgabe in Partitionsverzeichnisse unterhalb des Staging-Verzeichnisses von Spark, beispielsweise ${outputLocation}/spark-staging-${jobID}/k1=v1/k2=v2/.

  2. Für jede geschriebene Partition verfolgt der Aufgabenversuch die relativen Partitionspfade, z. B. k1=v1/k2=v2.

  3. Wenn eine Aufgabe erfolgreich abgeschlossen wurde, stellt sie dem Treiber alle zugehörigen Partitionspfade bereit, die von ihr nachverfolgt wurden.

  4. Nachdem alle Aufgaben abgeschlossen wurden, sammelt die Auftrags-Commit-Phase alle die Partitionsverzeichnisse, die von erfolgreichen Aufgabenversuchen unter dem Staging-Verzeichnis von Spark geschrieben wurden. Spark benennt jedes dieser Verzeichnisse mithilfe von Verzeichnisstruktur-Umbenennungsoperationen sequenziell bis zu ihrem endgültigen Ausgabespeicherort um.

  5. Das Staging-Verzeichnis wird gelöscht, bevor der Auftrags-Commit-Phase abgeschlossen ist.

Beispiel – Benutzerdefinierter Partitionsspeicherort

In diesem Beispiel wird der Scala-Code in zwei Partitionen eingefügt. Eine Partition verfügt über einen benutzerdefinierten Partitionsspeicherort. Die andere Partition verwendet den Standard-Partitionsspeicherort. Der EMRFS S3-optimierte Committer wird nur zum Schreiben der Aufgabenausgabe auf die Partition verwendet, die den Standardspeicherort der Partition verwendet.

val table = "dataset" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .write.insertInto(table)

Der Scala-Code erstellt die folgenden Amazon-S3-Objekte:

custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
custom_$folder$
table/_SUCCESS
table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
table/dt=2019-01-29_$folder$
table_$folder$

Beim Schreiben in Partitionen an benutzerdefinierten Speicherorten verwendet Spark einen Commit-Algorithmus, ähnlich wie im vorherigen Beispiel. Dies wird im Folgenden beschrieben. Genau wie bei dem vorherigen Beispiel führt der Algorithmus zu sequenziellen Umbenennungen, wodurch die Leistung beeinträchtigt werden kann.

  1. Beim Schreiben von Ausgabe in eine Partition an einem benutzerdefinierten Speicherort werden Aufgaben in eine Datei unter dem Staging-Verzeichnis von Spark geschrieben, das unter dem endgültigen Ausgabespeicherort erstellt wird. Der Name der Datei enthält zum Schutz vor Dateikollisionen einen UUID zufälligen Wert. Der Aufgabe-Versuch verfolgt jede Datei zusammen mit dem gewünschten endgültigen Ausgabepfad nach.

  2. Wenn eine Aufgabe erfolgreich abgeschlossen wird, stellt sie dem Treiber die Dateien und die für sie gewünschten endgültigen Ausgabepfade bereit.

  3. Nachdem alle Aufgaben beendet wurden, benennt die Auftrags-Commit-Phase alle Dateien, die für Partitionen an benutzerdefinierten Speicherorten geschrieben wurden, sequentiell in ihre endgültigen Ausgabepfade um.

  4. Das Staging-Verzeichnis wird gelöscht, bevor der Auftrags-Commit-Phase abgeschlossen ist.