View a markdown version of this page

Ejecute trabajos de Spark con Amazon EMR Serverless - FSx para ONTAP

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Ejecute trabajos de Spark con Amazon EMR Serverless

Los equipos de ingeniería de datos que ejecutan cargas de trabajo de Spark (para el procesamiento de registros, la ingeniería de funciones, la ETL compleja o el análisis científico) suelen tener datos de origen en un volumen de FSx para ONTAP escritos por canalizaciones de ingestión locales, motores de datos de NFS o SMB, o aplicaciones que montan el volumen directamente.

Con un punto de acceso Amazon S3 conectado al volumen, Amazon EMR Serverless lee los datos a través del punto de acceso, ejecuta el trabajo de Spark en él y vuelve a escribir los resultados en el mismo volumen. Amazon EMR Serverless gestiona el ciclo de vida del clúster de forma automática: usted envía un trabajo y paga por los segundos que se ejecuta.

Este patrón se adapta a las cargas de trabajo que necesitan un tiempo de ejecución completo de Spark (bibliotecas personalizadas, algoritmos iterativos, transformaciones de larga duración o cuadernos interactivos a través de Amazon EMR Studio), donde las opciones más livianas (Amazon Athena para AWS Glue SQL y ETL gestionado) no son las adecuadas. Para obtener información sobre esas alternativas, consulte y. Consulte archivos con SQL mediante Amazon Athena Cree canalizaciones de ETL mediante AWS Glue

En este tutorial, simulará un equipo de meteorología que agrega un año de observaciones del Resumen del Día de la Superficie Global (GSOD) de la NOAA organizadas en un FSx para el volumen de ONTAP. Envía un PySpark trabajo en el que se leen los archivos CSV sin procesar, se calculan los agregados mensuales por estación (temperatura media, precipitación total y recuento de días con precipitaciones) y se escriben los resultados como Parquet divididos por meses, todo ello a través del punto de acceso.

nota

Este tutorial tarda aproximadamente entre 30 y 40 minutos en completarse. Los Servicios de AWS usuarios incurren en cargos por los recursos que cree. Si completa todos los pasos, incluida la sección de limpieza, con prontitud, el coste previsto será inferior a 1 dólar en la zona este de EE. UU. (Virginia del Norte) Región de AWS. Esta estimación no incluye los cargos continuos del FSx para el propio volumen de ONTAP.

Requisitos previos

  • Un volumen FSx para ONTAP con un punto de acceso Amazon S3 conectado. El punto de acceso debe tener un origen de red de Internet para que el servicio Amazon EMR Serverless pueda acceder a él. Para obtener instrucciones, consulte Creación de un punto de acceso.

  • AWS CLI versión 2 instalada y configurada con credenciales que pueden crear funciones de IAM y recursos de Amazon EMR Serverless.

Paso 1: Cargue el conjunto de datos de muestra en el punto de acceso

El conjunto de datos GSOD de la NOAA es un conjunto de datos público de observaciones meteorológicas diarias, un archivo CSV por estación y año. Para este tutorial, debe descargar un subconjunto de 100 estaciones del bucket público de noaa-gsod-pds Amazon S3 y subirlo a su punto de acceso.

  1. Descargue los primeros archivos de las 100 estaciones para 2024.

    $ mkdir -p ~/gsod && cd ~/gsod aws s3 ls s3://noaa-gsod-pds/2024/ --no-sign-request | head -100 | awk '{print $NF}' > files.txt while read f; do aws s3 cp "s3://noaa-gsod-pds/2024/$f" "$f" --no-sign-request --only-show-errors done < files.txt ls | wc -l

    El comando descarga aproximadamente 100 archivos CSV con un total de entre 7 y 8 MB.

  2. Cargue los archivos en el punto de acceso con el prefijo. gsod/2024/ access-point-aliasSustitúyalos por el alias de tu punto de acceso.

    $ aws s3 cp ~/gsod/ "s3://access-point-alias/gsod/2024/" --recursive --exclude "files.txt" --only-show-errors

Paso 2: Escribe el PySpark trabajo

El trabajo lee todos los archivos CSV con el prefijo de entrada, filtra los valores centinela que representan los datos faltantes, analiza el FRSHTT campo de bits (niebla, lluvia, nieve, granizo, trueno, tornado) para contar los días con eventos de precipitación, los agrega y devuelve el Parquet particionado al (station, month) punto de acceso.

  1. Guarda el siguiente script en un archivo con el nombre. gsod_monthly.py

    # gsod_monthly.py import sys from pyspark.sql import SparkSession from pyspark.sql import functions as F INPUT_PATH, OUTPUT_PATH = sys.argv[1], sys.argv[2] # GSOD sentinels for missing data TEMP_SENTINEL = 9999.9 PRCP_SENTINEL = 99.99 spark = SparkSession.builder.appName("gsod-monthly-summary").getOrCreate() raw = spark.read.option("header", True).csv(INPUT_PATH) cleaned = (raw .select( F.col("STATION").alias("station"), F.col("NAME").alias("station_name"), F.col("LATITUDE").cast("double").alias("lat"), F.col("LONGITUDE").cast("double").alias("lon"), F.to_date("DATE", "yyyy-MM-dd").alias("date"), F.col("TEMP").cast("double").alias("temp_f"), F.col("PRCP").cast("double").alias("prcp_in"), F.col("FRSHTT").alias("frshtt"), ) .filter(F.col("temp_f") != TEMP_SENTINEL) .withColumn("month", F.date_format("date", "yyyy-MM")) .withColumn( "prcp_in", F.when(F.col("prcp_in") == PRCP_SENTINEL, None).otherwise(F.col("prcp_in")), ) # FRSHTT is a 6-char bitfield: Fog, Rain, Snow, Hail, Thunder, Tornado. # Check only positions 2-4 (Rain, Snow, Hail) for precipitation events. .withColumn( "had_precip_event", F.when(F.col("frshtt").substr(2, 3).rlike("1"), 1).otherwise(0), ) ) monthly = (cleaned .groupBy("station", "station_name", "lat", "lon", "month") .agg( F.avg("temp_f").alias("avg_temp_f"), F.min("temp_f").alias("min_temp_f"), F.max("temp_f").alias("max_temp_f"), F.sum("prcp_in").alias("total_prcp_in"), F.sum("had_precip_event").alias("precip_event_days"), F.count("*").alias("observation_days"), ) ) (monthly.write .mode("overwrite") .partitionBy("month") .parquet(OUTPUT_PATH)) spark.stop()
  2. Cargue el script en el punto de acceso con el scripts/ prefijo.

    $ aws s3 cp gsod_monthly.py "s3://access-point-alias/scripts/gsod_monthly.py"

Paso 3: Crear el rol de trabajo Amazon EMR Serverless

Amazon EMR Serverless asume una función de ejecución de IAM cuando ejecuta su trabajo. El rol necesita permisos para leer y escribir el punto de acceso y para escribir registros en Logs. CloudWatch Expanda la siguiente sección para ver los pasos de configuración.

  1. Guarde la siguiente política de confianza como. emr-trust-policy.json Permite que Amazon EMR Serverless asuma la función.

    { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": {"Service": "emr-serverless.amazonaws.com"}, "Action": "sts:AssumeRole" }] }
  2. Guarde la siguiente política de permisos como. emr-permissions.json Sustituya regionaccount-id, y access-point-name por sus valores.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "Logs", "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "logs:DescribeLogGroups", "logs:DescribeLogStreams" ], "Resource": "*" }, { "Sid": "APRead", "Effect": "Allow", "Action": ["s3:GetObject", "s3:ListBucket"], "Resource": [ "arn:aws:s3:region:account-id:accesspoint/access-point-name", "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" ] }, { "Sid": "APWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:DeleteObject", "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts" ], "Resource": "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" } ] }
  3. Cree el rol y adjunte la política.

    $ aws iam create-role --role-name fsxn-emr-job-role \ --assume-role-policy-document file://emr-trust-policy.json aws iam put-role-policy --role-name fsxn-emr-job-role \ --policy-name emr-access --policy-document file://emr-permissions.json

Paso 4: Crear e iniciar la aplicación Amazon EMR Serverless

Una aplicación Amazon EMR Serverless es un entorno informático de larga duración para una etiqueta de lanzamiento y un motor específicos (Spark o Hive). Le envías uno o más trabajos. Las aplicaciones amplían y reducen el cómputo automáticamente en función de la demanda de trabajo y permanecen inactivas cuando no hay ningún trabajo en ejecución.

  1. Cree una aplicación Spark con una versión reciente de Amazon EMR.

    $ aws emr-serverless create-application \ --name fsxn-emr-app --type SPARK --release-label emr-7.0.0

    Tenga en cuenta los applicationId en la respuesta.

  2. Inicie la aplicación. El arranque precalienta a un pequeño grupo de trabajadores para que el primer trabajo se ejecute sin demoras al arrancar en frío.

    $ aws emr-serverless start-application --application-id application-id

    Espera a que el estado se convierta. STARTED

    $ aws emr-serverless get-application --application-id application-id \ --query 'application.state'

Paso 5: Envía el trabajo de Spark

Envíe el trabajo con el ID de la aplicación y el rol de ejecución. El trabajo lee los CSV sin procesar gsod/2024/ y escribe en Parquet particionadogsod-monthly/, ambos a través del punto de acceso.

  1. Guarde la configuración del controlador de trabajo como. job-driver.json Sustituya los marcadores de posición.

    { "sparkSubmit": { "entryPoint": "s3://access-point-alias/scripts/gsod_monthly.py", "entryPointArguments": [ "s3://access-point-alias/gsod/2024/", "s3://access-point-alias/gsod-monthly/" ], "sparkSubmitParameters": "--conf spark.executor.cores=2 --conf spark.executor.memory=4g --conf spark.driver.cores=2 --conf spark.driver.memory=4g --conf spark.executor.instances=2" } }
  2. Guarde la siguiente configuración de supervisión comojob-config.json. Envía los registros del controlador y del ejecutor a CloudWatch Logs.

    { "monitoringConfiguration": { "cloudWatchLoggingConfiguration": { "enabled": true, "logGroupName": "/aws/emr-serverless/fsxn-emr-app" } } }
  3. Envíe el trabajo.

    $ aws emr-serverless start-job-run \ --application-id application-id \ --execution-role-arn arn:aws:iam::account-id:role/fsxn-emr-job-role \ --name gsod-monthly \ --job-driver file://job-driver.json \ --configuration-overrides file://job-config.json

    Tenga en cuenta los jobRunId en la respuesta.

  4. Sondea el estado del trabajo. La transición laboral de SCHEDULED a RUNNING aSUCCESS.

    $ aws emr-serverless get-job-run \ --application-id application-id \ --job-run-id job-run-id \ --query 'jobRun.state'
nota

Si el trabajo falla, compruebe los registros del controlador en CloudWatch los registros del grupo de registros/aws/emr-serverless/fsxn-emr-app. Amazon EMR Serverless escribe un flujo de registro por ejecución de trabajo.

Paso 6: inspeccione el resultado

Compruebe que el trabajo haya escrito una partición de Parquet al mes y que el resultado sea legible.

  1. Enumere las particiones de salida.

    $ aws s3 ls "s3://access-point-alias/gsod-monthly/" --recursive

    Debería ver un archivo Parquet por month=YYYY-MM/ partición más un _SUCCESS marcador en la raíz.

  2. Lea una partición localmente para verificar el contenido.

    $ aws s3 cp "s3://access-point-alias/gsod-monthly/month=2024-06/" . \ --recursive --exclude "_SUCCESS" python3 -c "import pyarrow.parquet as pq; \ t = pq.read_table(next(__import__('glob').iglob('*.parquet'))); \ print(t.schema); print(t.to_pandas().head())"

    El esquema de salida incluye station station_namelat,lon,avg_temp_f,min_temp_f, max_temp_ftotal_prcp_in,precip_event_days, yobservation_days.

Extender el patrón

  • Consulta el resultado con Spark SQL. Registra la salida particionada como una tabla con AWS Glue Data Catalog y consúltala con Spark SQL, Athena o cualquier otra herramienta que AWS Glue lea tablas de catálogos. Para obtener instrucciones sobre cómo registrar un conjunto de datos respaldado por un punto de acceso, consulte. Consulte archivos con SQL mediante Amazon Athena

  • Usa Iceberg para escrituras con ACID. En el caso de las cargas de trabajo que actualizan o combinan datos, configure el trabajo para que escriba en una tabla de Iceberg en el punto de acceso, en lugar de hacerlo en Parquet. Amazon EMR Serverless incluye el tiempo de ejecución Iceberg de forma predeterminada en las etiquetas de las versiones recientes.

  • Ejecute de forma interactiva con Amazon EMR Studio. Adjunte un cuaderno Jupyter a la aplicación Amazon EMR Serverless para explorar los datos de forma interactiva. Consulte Cargas de trabajo interactivas con Amazon EMR Serverless en la Guía del usuario de Amazon EMR Serverless.

  • Programe el trabajo. Utilice Amazon EventBridge Scheduler o AWS Step Functions para ejecutar el trabajo de forma periódica (por ejemplo, cuando llegue un nuevo día de datos al volumen).

Resolución de problemas

Job falla con AccessDenied el punto de acceso

Compruebe que la política de funciones de trabajo concede s3:GetObject y s3:ListBucket en el ARN del punto de acceso (no en un bucket) y que el punto de acceso tiene un origen de red de Internet para que el servicio Amazon EMR Serverless pueda acceder a él.

Job se realiza correctamente pero la salida está vacía

Compruebe la ruta de entrada. Amazon S3 ListObjectsV2 trata los prefijos de forma literal, por lo que s3://alias/gsod/2024 (sin barra final) y s3://alias/gsod/2024/ (barra final) pueden comportarse de forma diferente. Incluya la barra final cuando apunte a un directorio de archivos.

Los registros de los controladores no están en los registros CloudWatch

La configuración de supervisión debe transferirse a --configuration-overrides la aplicaciónstart-job-run, no a ella. Cada trabajo que se ejecuta se escribe en su propio flujo de registros dentro del grupo de registros configurado.

Limpieza

Detenga y elimine la aplicación, elimine la función de IAM y elimine los datos cargados que ya no necesite.

$ aws emr-serverless stop-application --application-id application-id aws emr-serverless delete-application --application-id application-id aws iam delete-role-policy --role-name fsxn-emr-job-role --policy-name emr-access aws iam delete-role --role-name fsxn-emr-job-role aws s3 rm "s3://access-point-alias/scripts/gsod_monthly.py" aws s3 rm "s3://access-point-alias/gsod/" --recursive aws s3 rm "s3://access-point-alias/gsod-monthly/" --recursive