Scrittura di un'applicazione Spark - Amazon EMR

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Scrittura di un'applicazione Spark

Le applicazioni Spark possono essere scritte in Scala, Java o Python. Vari esempi di applicazioni Spark sono disponibili nell'argomento Spark examples (Esempi Spark) nella documentazione di Apache Spark. L'esempio di stima del numero Pi è illustrato di seguito nelle tre applicazioni supportate a livello nativo. Puoi anche visualizzare esempi completi in $SPARK_HOME/examples e su. GitHub Per ulteriori informazioni su come creare JARs per Spark, consulta l'argomento Quick start nella documentazione di Apache Spark.

Scala

Per evitare problemi di compatibilità con Scala, ti suggeriamo di utilizzare le dipendenze Spark per la versione di Scala corretta quando compili un'applicazione Spark per un cluster Amazon. EMR La versione Scala che dovresti utilizzare dipende dalla versione di Spark installata nel tuo cluster. Ad esempio, la EMR versione 5.30.1 di Amazon utilizza Spark 2.4.5, che è stato creato con Scala 2.11. Se il tuo cluster utilizza la EMR versione Amazon 5.30.1, usa le dipendenze Spark per Scala 2.11. Per ulteriori informazioni sulle versioni di Scala utilizzate da Spark, consulta la Documentazione di Apache Spark.

package org.apache.spark.examples import scala.math.random import org.apache.spark._ /** Computes an approximation to pi */ object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow val count = spark.parallelize(1 until n, slices).map { i => val x = random * 2 - 1 val y = random * 2 - 1 if (x*x + y*y < 1) 1 else 0 }.reduce(_ + _) println("Pi is roughly " + 4.0 * count / n) spark.stop() } }

Java

package org.apache.spark.examples; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import java.util.ArrayList; import java.util.List; /** * Computes an approximation to pi * Usage: JavaSparkPi [slices] */ public final class JavaSparkPi { public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2; int n = 100000 * slices; List<Integer> l = new ArrayList<Integer>(n); for (int i = 0; i < n; i++) { l.add(i); } JavaRDD<Integer> dataSet = jsc.parallelize(l, slices); int count = dataSet.map(new Function<Integer, Integer>() { @Override public Integer call(Integer integer) { double x = Math.random() * 2 - 1; double y = Math.random() * 2 - 1; return (x * x + y * y < 1) ? 1 : 0; } }).reduce(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) { return integer + integer2; } }); System.out.println("Pi is roughly " + 4.0 * count / n); jsc.stop(); } }

Python

import argparse import logging from operator import add from random import random from pyspark.sql import SparkSession logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") def calculate_pi(partitions, output_uri): """ Calculates pi by testing a large number of random numbers against a unit circle inscribed inside a square. The trials are partitioned so they can be run in parallel on cluster instances. :param partitions: The number of partitions to use for the calculation. :param output_uri: The URI where the output is written, typically an Amazon S3 bucket, such as 's3://example-bucket/pi-calc'. """ def calculate_hit(_): x = random() * 2 - 1 y = random() * 2 - 1 return 1 if x**2 + y**2 < 1 else 0 tries = 100000 * partitions logger.info( "Calculating pi with a total of %s tries in %s partitions.", tries, partitions ) with SparkSession.builder.appName("My PyPi").getOrCreate() as spark: hits = ( spark.sparkContext.parallelize(range(tries), partitions) .map(calculate_hit) .reduce(add) ) pi = 4.0 * hits / tries logger.info("%s tries and %s hits gives pi estimate of %s.", tries, hits, pi) if output_uri is not None: df = spark.createDataFrame([(tries, hits, pi)], ["tries", "hits", "pi"]) df.write.mode("overwrite").json(output_uri) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "--partitions", default=2, type=int, help="The number of parallel partitions to use when calculating pi.", ) parser.add_argument( "--output_uri", help="The URI where output is saved, typically an S3 bucket." ) args = parser.parse_args() calculate_pi(args.partitions, args.output_uri)