

# Ejemplos de código Python en AWS Glue
<a name="aws-glue-programming-python-samples"></a>
+ [Código de ejemplo: unión de los datos y establecimiento de relaciones entre ellos](aws-glue-programming-python-samples-legislators.md)
+ [Ejemplo de código: Preparación de datos con ResolveChoice, Lambda y ApplyMapping](aws-glue-programming-python-samples-medicaid.md)

# Código de ejemplo: unión de los datos y establecimiento de relaciones entre ellos
<a name="aws-glue-programming-python-samples-legislators"></a>

En este ejemplo se utiliza un conjunto de datos que se ha descargado desde [http://everypolitician.org/](http://everypolitician.org/) al bucket `sample-dataset` de Amazon Simple Storage Service (Amazon S3): `s3://awsglue-datasets/examples/us-legislators/all`. El conjunto de datos contiene información en formato JSON sobre legisladores de los Estados Unidos y los escaños de los que han sido titulares en la Cámara de Representantes y el Senado. Para realizar este tutorial, este conjunto de datos se ha modificado ligeramente y está disponible en un bucket público de Amazon S3.

Puede encontrar el código fuente de este ejemplo en el archivo `join_and_relationalize.py` del [repositorio de ejemplos de AWS Glue](https://github.com/awslabs/aws-glue-samples) en el sitio web de GitHub.

Con estos datos, el presente tutorial le enseña a realizar las siguientes tareas:
+ Utilice un rastreador de AWS Glue para clasificar los objetos que están almacenados en un bucket público de Amazon S3 y guardar sus esquemas en el Catálogo de datos de AWS Glue.
+ Examinar los metadatos y los esquemas de la tabla que se obtienen a partir del rastreo.
+ Escriba un script de extracción, transferencia y carga (ETL) de Python que utilice los metadatos del Catálogo de datos para hacer lo siguiente:
  + Unir los datos de los diferentes archivos de origen juntos en una única tabla de datos (es decir, desnormalizar los datos).
  + Desglosar la tabla unida en diferentes tablas según el tipo de legislador.
  + Escribir los datos resultantes en archivos Apache Parquet independientes para realizar un análisis posteriormente.

La forma preferida de depurar scripts de Python o PySpark mientras se están ejecutando en AWS es usar [Notebooks en Glue Studio AWS](https://docs.aws.amazon.com/glue/latest/ug/notebooks-chapter.html).

## Paso 1: Rastrear los datos del bucket de Amazon S3
<a name="aws-glue-programming-python-samples-legislators-crawling"></a>

1. Luego, inicie sesión en la Consola de administración de AWS y abra la consola de AWS Glue en [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. En función de los pasos que se indican en [Configuración de rastreadores](define-crawler.md), cree un nuevo rastreador que pueda rastrear el conjunto de datos `s3://awsglue-datasets/examples/us-legislators/all` en una base de datos denominada `legislators` en el Catálogo de datos de AWS Glue. Los datos de ejemplo ya están en este bucket público de Amazon S3.

1. Ejecute el nuevo rastreador y, a continuación, compruebe la base de datos `legislators`. 

   El rastreador crea las siguientes tablas de metadatos:
   + `persons_json`
   + `memberships_json`
   + `organizations_json`
   + `events_json`
   + `areas_json`
   + `countries_r_json`

   Esta es una colección seminormalizada de tablas que contienen legisladores y sus historias.

## Paso 2: Añadir un script reutilizable al cuaderno del punto de conexión de desarrollo
<a name="aws-glue-programming-python-samples-legislators-boilerplate"></a>

Pegue el script reutilizable siguiente en el cuaderno del punto de enlace de desarrollo para importar las bibliotecas de AWS Glue que necesite y configurar un único `GlueContext`:

```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())
```

## Paso 3: Examinar los esquemas de los datos del Catálogo de datos
<a name="aws-glue-programming-python-samples-legislators-schemas"></a>

A continuación, puede crear un DynamicFrame con facilidad desde el Catálogo de datos de AWS Glue y examinar los esquemas de los datos. Por ejemplo, para ver el esquema de la tabla `persons_json`, añada lo siguiente en su cuaderno:

```
persons = glueContext.create_dynamic_frame.from_catalog(
             database="legislators",
             table_name="persons_json")
print "Count: ", persons.count()
persons.printSchema()
```

Esta es la salida de las llamadas impresas:

```
Count:  1961
root
|-- family_name: string
|-- name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- name: string
|    |    |-- lang: string
|-- sort_name: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- death_date: string
```

Cada persona de la tabla es miembro de algún órgano del congreso de los Estados Unidos.

Para ver el esquema de la tabla `memberships_json`, escriba lo siguiente:

```
memberships = glueContext.create_dynamic_frame.from_catalog(
                 database="legislators",
                 table_name="memberships_json")
print "Count: ", memberships.count()
memberships.printSchema()
```

La salida es la siguiente:

```
Count:  10439
root
|-- area_id: string
|-- on_behalf_of_id: string
|-- organization_id: string
|-- role: string
|-- person_id: string
|-- legislative_period_id: string
|-- start_date: string
|-- end_date: string
```

Las `organizations` son partes y las dos cámaras del Congreso, el Senado y la Cámara de Representantes. Para ver el esquema de la tabla `organizations_json`, escriba lo siguiente:

```
orgs = glueContext.create_dynamic_frame.from_catalog(
           database="legislators",
           table_name="organizations_json")
print "Count: ", orgs.count()
orgs.printSchema()
```

La salida es la siguiente:

```
Count:  13
root
|-- classification: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- image: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- id: string
|-- name: string
|-- seats: int
|-- type: string
```

## Paso 4: Filtrar los datos
<a name="aws-glue-programming-python-samples-legislators-filtering"></a>

A continuación, mantenga solo los campos que desee y cambie el nombre `id` por `org_id`. El conjunto de datos es lo suficientemente pequeño para ver todo el conjunto. 

`toDF()` convierte a `DynamicFrame` en un elemento `DataFrame` de Apache Spark, por lo que puede aplicar las transformaciones que ya existen en Apache Spark SQL:

```
orgs = orgs.drop_fields(['other_names',
                        'identifiers']).rename_field(
                            'id', 'org_id').rename_field(
                               'name', 'org_name')
orgs.toDF().show()
```

El ejemplo siguiente muestra la salida:

```
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
|classification|              org_id|            org_name|               links|seats|       type|               image|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
|         party|            party/al|                  AL|                null| null|       null|                null|
|         party|      party/democrat|            Democrat|[[website,http://...| null|       null|https://upload.wi...|
|         party|party/democrat-li...|    Democrat-Liberal|[[website,http://...| null|       null|                null|
|   legislature|d56acebe-8fdc-47b...|House of Represen...|                null|  435|lower house|                null|
|         party|   party/independent|         Independent|                null| null|       null|                null|
|         party|party/new_progres...|     New Progressive|[[website,http://...| null|       null|https://upload.wi...|
|         party|party/popular_dem...|    Popular Democrat|[[website,http://...| null|       null|                null|
|         party|    party/republican|          Republican|[[website,http://...| null|       null|https://upload.wi...|
|         party|party/republican-...|Republican-Conser...|[[website,http://...| null|       null|                null|
|         party|      party/democrat|            Democrat|[[website,http://...| null|       null|https://upload.wi...|
|         party|   party/independent|         Independent|                null| null|       null|                null|
|         party|    party/republican|          Republican|[[website,http://...| null|       null|https://upload.wi...|
|   legislature|8fa6c3d2-71dc-478...|              Senate|                null|  100|upper house|                null|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
```

Escriba lo siguiente para ver las `organizations` que aparecen en `memberships`:

```
memberships.select_fields(['organization_id']).toDF().distinct().show()
```

El ejemplo siguiente muestra la salida:

```
+--------------------+
|     organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+
```

## Paso 5: Reunirlo todo
<a name="aws-glue-programming-python-samples-legislators-joining"></a>

Ahora utilice AWS Glue para unir estas tablas relacionales y crear una tabla del historial completo de `memberships` del legislador y sus `organizations` correspondientes.

1. En primer lugar, una `persons` y `memberships` en `id` y `person_id`.

1. A continuación, una el resultado con `orgs` en `org_id` y `organization_id`.

1. A continuación, anule los campos redundantes `person_id` y `org_id`.

Puede realizar todas estas operaciones en una línea de código (extendida):

```
l_history = Join.apply(orgs,
                       Join.apply(persons, memberships, 'id', 'person_id'),
                       'org_id', 'organization_id').drop_fields(['person_id', 'org_id'])
print "Count: ", l_history.count()
l_history.printSchema()
```

La salida es la siguiente:

```
Count:  10439
root
|-- role: string
|-- seats: int
|-- org_name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- type: string
|-- sort_name: string
|-- area_id: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- on_behalf_of_id: string
|-- other_names: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- name: string
|    |    |-- lang: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- name: string
|-- birth_date: string
|-- organization_id: string
|-- gender: string
|-- classification: string
|-- death_date: string
|-- legislative_period_id: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- image: string
|-- given_name: string
|-- family_name: string
|-- id: string
|-- start_date: string
|-- end_date: string
```

Ahora tiene la tabla definitiva que puede utilizar para su análisis. Puede escribirla en un formato compacto y eficiente para el análisis, por ejemplo, en Parquet, en el que puede ejecutar SQL en AWS Glue, Amazon Athena o Amazon Redshift Spectrum.

La siguiente llamada escribe la tabla en varios archivos para admitir lecturas paralelas rápidas al realizar el análisis posterior:

```
glueContext.write_dynamic_frame.from_options(frame = l_history,
          connection_type = "s3",
          connection_options = {"path": "s3://glue-sample-target/output-dir/legislator_history"},
          format = "parquet")
```

Para poner todos los datos de historial en un único archivo, debe convertirlos en una estructura de datos, crear nuevas particiones y escribirlos:

```
s_history = l_history.toDF().repartition(1)
s_history.write.parquet('s3://glue-sample-target/output-dir/legislator_single')
```

O, si desea separarlos por Senado y Cámara:

```
l_history.toDF().write.parquet('s3://glue-sample-target/output-dir/legislator_part',
                               partitionBy=['org_name'])
```

## Paso 6: Transformar los datos para bases de datos relacionales
<a name="aws-glue-programming-python-samples-legislators-writing"></a>

AWS Glue facilita la tarea de escribir los datos en bases de datos relacionales como Amazon Redshift, incluso con datos semiestructurados. Ofrece una transformación `relationalize`, que aplana `DynamicFrames` sea cual sea la complejidad de los objetos de la trama.

Utilizando el elemento `l_history` `DynamicFrame` de este ejemplo, pase el nombre de una tabla (`hist_root`) y una ruta de flujo de trabajo temporal a `relationalize`. Esto devuelve un elemento `DynamicFrameCollection`. A continuación, puede enumerar los nombres de `DynamicFrames` en esa colección:

```
dfc = l_history.relationalize("hist_root", "s3://glue-sample-target/temp-dir/")
dfc.keys()
```

A continuación, se muestra la salida de la llamada `keys`:

```
[u'hist_root', u'hist_root_contact_details', u'hist_root_links',
 u'hist_root_other_names', u'hist_root_images', u'hist_root_identifiers']
```

`Relationalize` ha desglosado la tabla de historial en seis tablas nuevas: una tabla raíz que contiene un registro por cada objeto de `DynamicFrame` y tablas auxiliares para las matrices. A menudo, la gestión de matrices en las bases de datos relacionales no tiene un nivel óptimo, en particular cuando dichas matrices se hacen más grandes. Si se separan las matrices en diferentes tablas, las consultas serán mucho más rápidas.

A continuación, céntrese en la separación examinando `contact_details`:

```
l_history.select_fields('contact_details').printSchema()
dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()
```

A continuación, se muestra la salida de la llamada `show`:

```
root
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 10|    0|                     fax|                         |
| 10|    1|                        |             202-225-1314|
| 10|    2|                   phone|                         |
| 10|    3|                        |             202-225-3772|
| 10|    4|                 twitter|                         |
| 10|    5|                        |          MikeRossUpdates|
| 75|    0|                     fax|                         |
| 75|    1|                        |             202-225-7856|
| 75|    2|                   phone|                         |
| 75|    3|                        |             202-225-2711|
| 75|    4|                 twitter|                         |
| 75|    5|                        |                SenCapito|
+---+-----+------------------------+-------------------------+
```

El campo `contact_details` era una matriz de estructuras en el elemento `DynamicFrame` original. Cada elemento de estas matrices es una fila independiente de la tabla auxiliar, indizada por `index`. Aquí el `id` es una clave externa en la tabla `hist_root` con la clave `contact_details`:

```
dfc.select('hist_root').toDF().where(
    "contact_details = 10 or contact_details = 75").select(
       ['id', 'given_name', 'family_name', 'contact_details']).show()
```

Se genera la salida siguiente:

```
+--------------------+----------+-----------+---------------+
|                  id|given_name|family_name|contact_details|
+--------------------+----------+-----------+---------------+
|f4fc30ee-7b42-432...|      Mike|       Ross|             10|
|e3c60f34-7d1b-4c0...|   Shelley|     Capito|             75|
+--------------------+----------+-----------+---------------+
```

Observe en estos comandos que se utiliza `toDF()` y, a continuación, una expresión `where`, para filtrar y obtener las filas que desea ver.

Por lo tanto, unir la tabla `hist_root` con las tablas auxiliares le permite hacer lo siguiente:
+ Cargar datos en bases de datos sin compatibilidad de matrices.
+ Consultar cada elemento individual de una matriz con SQL.

Almacene las credenciales de Amazon Redshift y acceda a ellas de forma segura con una conexión de AWS Glue. Para obtener información acerca de cómo crear su propia conexión, consulte [Conexión a datos](glue-connections.md).

Ahora está preparado para escribir los datos en una conexión pasando por los elementos `DynamicFrames` de uno en uno:

```
for df_name in dfc.keys():
  m_df = dfc.select(df_name)
  print "Writing to table: ", df_name
  glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df, connection settings here)
```

La configuración de la conexión variará en función del tipo de base de datos relacional: 
+ Para obtener instrucciones sobre cómo escribir en Amazon Redshift, consulte [Conexiones Redshift](aws-glue-programming-etl-connect-redshift-home.md).
+ Para otras bases de datos, consulte [Tipos de conexión y opciones para ETL en AWS Glue para Spark](aws-glue-programming-etl-connect.md).

## Conclusión
<a name="aws-glue-programming-python-samples-legislators-conclusion"></a>

En general, AWS Glue es muy flexible. Le permite realizar en unas cuantas líneas de código lo que normalmente tardaría varios días en escribirse. Puede encontrar todos los scripts de ETL de origen a destino en el archivo de Python `join_and_relationalize.py`, en los [ejemplos de AWS Glue](https://github.com/awslabs/aws-glue-samples) en GitHub.

# Ejemplo de código: Preparación de datos con ResolveChoice, Lambda y ApplyMapping
<a name="aws-glue-programming-python-samples-medicaid"></a>

El conjunto de datos que se utiliza en este ejemplo está formado por datos de pago de Medicare Provider que se descargaron de dos sitios de [Data.CMS.gov](https://data.cms.gov), conjuntos de datos: "Inpatient Prospective Payment System Provider Summary for the Top 100 Diagnosis-Related Groups - FY2011" e "Inpatient Charge Data FY 2011". Después de descargar los datos, modificamos el conjunto de datos para presentar un par de registros erróneos al final del archivo. Este archivo modificado se encuentra en un bucket público de Amazon S3 en `s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv`.

Puede encontrar el código fuente de este ejemplo en el archivo `data_cleaning_and_lambda.py` del repositorio GitHub de [ejemplos de AWS Glue](https://github.com/awslabs/aws-glue-samples).

La forma preferida de depurar scripts de Python o PySpark mientras se están ejecutando en AWS es usar [Notebooks en Glue Studio AWS](https://docs.aws.amazon.com/glue/latest/ug/notebooks-chapter.html).

## Paso 1: Rastrear los datos del bucket de Amazon S3
<a name="aws-glue-programming-python-samples-medicaid-crawling"></a>

1. Inicie sesión en la Consola de administración de AWS y abra la consola de AWS Glue en [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. En función del proceso descrito en [Configuración de rastreadores](define-crawler.md), cree un rastreador nuevo que pueda rastrear el archivo `s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv` y poner los metadatos resultantes en una base de datos denominada `payments` en AWS Glue Data Catalog.

1. Ejecute el nuevo rastreador y, a continuación, compruebe la base de datos `payments`. Deberá ver que el rastreador ha creado en la base de datos una tabla de metadatos denominada `medicare` después de leer el comienzo del archivo para determinar su formato y delimitador.

   El esquema de la tabla `medicare` nueva es el siguiente:

   ```
   Column  name                            Data type
   ==================================================
   drg definition                             string
   provider id                                bigint
   provider name                              string
   provider street address                    string
   provider city                              string
   provider state                             string
   provider zip code                          bigint
   hospital referral region description       string
   total discharges                           bigint
   average covered charges                    string
   average total payments                     string
   average medicare payments                  string
   ```

## Paso 2: Añadir un script reutilizable al cuaderno del punto de conexión de desarrollo
<a name="aws-glue-programming-python-samples-medicaid-boilerplate"></a>

Pegue el script reutilizable siguiente en el cuaderno del punto de enlace de desarrollo para importar las bibliotecas de AWS Glue que necesite y configurar un único `GlueContext`:

```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())
```

## Paso 3: Comparar diferentes análisis de esquemas
<a name="aws-glue-programming-python-samples-medicaid-schemas"></a>

A continuación, puede ver si el esquema que un elemento `DataFrame` de Apache Spark reconoció es el mismo que su rastreador AWS Glue ha registrado. Ejecute el código siguiente:

```
medicare = spark.read.format(
   "com.databricks.spark.csv").option(
   "header", "true").option(
   "inferSchema", "true").load(
   's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv')
medicare.printSchema()
```

Esta es la salida de la llamada `printSchema`:

```
root
 |-- DRG Definition: string (nullable = true)
 |-- Provider Id: string (nullable = true)
 |-- Provider Name: string (nullable = true)
 |-- Provider Street Address: string (nullable = true)
 |-- Provider City: string (nullable = true)
 |-- Provider State: string (nullable = true)
 |-- Provider Zip Code: integer (nullable = true)
 |-- Hospital Referral Region Description: string (nullable = true)
 |--  Total Discharges : integer (nullable = true)
 |--  Average Covered Charges : string (nullable = true)
 |--  Average Total Payments : string (nullable = true)
 |-- Average Medicare Payments: string (nullable = true)
```

A continuación, examine el esquema que un AWS Glue de `DynamicFrame` genera:

```
medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog(
       database = "payments",
       table_name = "medicare")
medicare_dynamicframe.printSchema()
```

La salida de `printSchema` es la siguiente:

```
root
 |-- drg definition: string
 |-- provider id: choice
 |    |-- long
 |    |-- string
 |-- provider name: string
 |-- provider street address: string
 |-- provider city: string
 |-- provider state: string
 |-- provider zip code: long
 |-- hospital referral region description: string
 |-- total discharges: long
 |-- average covered charges: string
 |-- average total payments: string
 |-- average medicare payments: string
```

El elemento `DynamicFrame` genera un esquema donde `provider id` puede ser el tipo `long` o el tipo `string`. El esquema `DataFrame` indica que `Provider Id` es de tipo `string`, y el Data Catalog indica que `provider id` es de tipo `bigint`.

¿Cuál es el correcto? Hay dos registros al final del archivo (de 160 000 registros) con valores `string` en dicha columna. Estos son los registros erróneos que se introdujeron para ilustrar un problema.

Para abordar este tipo de problema, el `DynamicFrame` de AWS Glue introduce el concepto de tipo de *choice (elección)*. En este caso, `DynamicFrame` muestra que los valores `long` y `string` pueden aparecer en dicha columna. El rastreador de AWS Glue no tuvo en cuenta los valores `string`, ya que consideró solo un prefijo de 2 MB de los datos. El elemento `DataFrame` de Apache Spark tuvo en cuenta el conjunto de datos en su totalidad, pero se vio obligado a asignar el tipo más general a la columna, en este caso `string`. De hecho, Spark a menudo recurre al caso más general cuando hay tipos complejos o variaciones con las que no está familiarizado.

Para consultar la columna `provider id`, primero debe resolver el tipo de elección. Puede utilizar el método de transformación `resolveChoice` de `DynamicFrame` para convertir estos valores `string` en valores `long` con una opción `cast:long`:

```
medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')])
medicare_res.printSchema()
```

Ahora la salida `printSchema` es:

```
root
 |-- drg definition: string
 |-- provider id: long
 |-- provider name: string
 |-- provider street address: string
 |-- provider city: string
 |-- provider state: string
 |-- provider zip code: long
 |-- hospital referral region description: string
 |-- total discharges: long
 |-- average covered charges: string
 |-- average total payments: string
 |-- average medicare payments: string
```

Cuando el valor era un elemento `string` que no se podía transformar, AWS Glue insertaba un valor `null`.

Otra opción consiste en convertir el tipo de elección en un elemento `struct`, que mantiene los valores de ambos tipos.

A continuación, examine las filas que eran anómalas:

```
medicare_res.toDF().where("'provider id' is NULL").show()
```

Verá lo siguiente:

```
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
|      drg definition|provider id|  provider name|provider street address|provider city|provider state|provider zip code|hospital referral region description|total discharges|average covered charges|average total payments|average medicare payments|
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
|948 - SIGNS & SYM...|       null|            INC|       1050 DIVISION ST|      MAUSTON|            WI|            53948|                        WI - Madison|              12|              $11961.41|              $4619.00|                 $3775.33|
|948 - SIGNS & SYM...|       null| INC- ST JOSEPH|     5000 W CHAMBERS ST|    MILWAUKEE|            WI|            53210|                      WI - Milwaukee|              14|              $10514.28|              $5562.50|                 $4522.78|
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
```

Ahora elimine los dos registros mal formados, tal y como se indica a continuación:

```
medicare_dataframe = medicare_res.toDF()
medicare_dataframe = medicare_dataframe.where("'provider id' is NOT NULL")
```

## Paso 4: Asignar los datos y utilizar las funciones Lambda de Apache Spark
<a name="aws-glue-programming-python-samples-medicaid-lambda-mapping"></a>

AWS Glue todavía no es compatible directamente con funciones Lambda, también conocidas como funciones definidas por el usuario. Pero siempre puede convertir un elemento `DynamicFrame` en un elemento `DataFrame` de Apache Spark y viceversa, para aprovechar la funcionalidad de Spark, además de las características especiales de `DynamicFrames`.

A continuación, convierta la información de pago en números, para que los motores de análisis como Amazon Redshift o Amazon Athena puedan controlar más deprisa sus números:

```
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

chop_f = udf(lambda x: x[1:], StringType())
medicare_dataframe = medicare_dataframe.withColumn(
        "ACC", chop_f(
            medicare_dataframe["average covered charges"])).withColumn(
                "ATP", chop_f(
                    medicare_dataframe["average total payments"])).withColumn(
                        "AMP", chop_f(
                            medicare_dataframe["average medicare payments"]))
medicare_dataframe.select(['ACC', 'ATP', 'AMP']).show()
```

La salida de la llamada `show` es la siguiente:

```
+--------+-------+-------+
|     ACC|    ATP|    AMP|
+--------+-------+-------+
|32963.07|5777.24|4763.73|
|15131.85|5787.57|4976.71|
|37560.37|5434.95|4453.79|
|13998.28|5417.56|4129.16|
|31633.27|5658.33|4851.44|
|16920.79|6653.80|5374.14|
|11977.13|5834.74|4761.41|
|35841.09|8031.12|5858.50|
|28523.39|6113.38|5228.40|
|75233.38|5541.05|4386.94|
|67327.92|5461.57|4493.57|
|39607.28|5356.28|4408.20|
|22862.23|5374.65|4186.02|
|31110.85|5366.23|4376.23|
|25411.33|5282.93|4383.73|
| 9234.51|5676.55|4509.11|
|15895.85|5930.11|3972.85|
|19721.16|6192.54|5179.38|
|10710.88|4968.00|3898.88|
|51343.75|5996.00|4962.45|
+--------+-------+-------+
only showing top 20 rows
```

Sigue habiendo cadenas en los datos. Podemos utilizar el potente método de transformación `apply_mapping` para rechazar, renombrar, difundir y anidar los datos para que otros lenguajes de programación y sistemas de datos puedan obtener fácilmente acceso a ellos:

```
from awsglue.dynamicframe import DynamicFrame
medicare_tmp_dyf = DynamicFrame.fromDF(medicare_dataframe, glueContext, "nested")
medicare_nest_dyf = medicare_tmp_dyf.apply_mapping([('drg definition', 'string', 'drg', 'string'),
                 ('provider id', 'long', 'provider.id', 'long'),
                 ('provider name', 'string', 'provider.name', 'string'),
                 ('provider city', 'string', 'provider.city', 'string'),
                 ('provider state', 'string', 'provider.state', 'string'),
                 ('provider zip code', 'long', 'provider.zip', 'long'),
                 ('hospital referral region description', 'string','rr', 'string'),
                 ('ACC', 'string', 'charges.covered', 'double'),
                 ('ATP', 'string', 'charges.total_pay', 'double'),
                 ('AMP', 'string', 'charges.medicare_pay', 'double')])
medicare_nest_dyf.printSchema()
```

La salida de `printSchema` es la siguiente:

```
root
 |-- drg: string
 |-- provider: struct
 |    |-- id: long
 |    |-- name: string
 |    |-- city: string
 |    |-- state: string
 |    |-- zip: long
 |-- rr: string
 |-- charges: struct
 |    |-- covered: double
 |    |-- total_pay: double
 |    |-- medicare_pay: double
```

Si vuelve a convertir los datos en un elemento `DataFrame` de Spark, puede mostrar cómo son ahora:

```
medicare_nest_dyf.toDF().show()
```

La salida es la siguiente:

```
+--------------------+--------------------+---------------+--------------------+
|                 drg|            provider|             rr|             charges|
+--------------------+--------------------+---------------+--------------------+
|039 - EXTRACRANIA...|[10001,SOUTHEAST ...|    AL - Dothan|[32963.07,5777.24...|
|039 - EXTRACRANIA...|[10005,MARSHALL M...|AL - Birmingham|[15131.85,5787.57...|
|039 - EXTRACRANIA...|[10006,ELIZA COFF...|AL - Birmingham|[37560.37,5434.95...|
|039 - EXTRACRANIA...|[10011,ST VINCENT...|AL - Birmingham|[13998.28,5417.56...|
|039 - EXTRACRANIA...|[10016,SHELBY BAP...|AL - Birmingham|[31633.27,5658.33...|
|039 - EXTRACRANIA...|[10023,BAPTIST ME...|AL - Montgomery|[16920.79,6653.8,...|
|039 - EXTRACRANIA...|[10029,EAST ALABA...|AL - Birmingham|[11977.13,5834.74...|
|039 - EXTRACRANIA...|[10033,UNIVERSITY...|AL - Birmingham|[35841.09,8031.12...|
|039 - EXTRACRANIA...|[10039,HUNTSVILLE...|AL - Huntsville|[28523.39,6113.38...|
|039 - EXTRACRANIA...|[10040,GADSDEN RE...|AL - Birmingham|[75233.38,5541.05...|
|039 - EXTRACRANIA...|[10046,RIVERVIEW ...|AL - Birmingham|[67327.92,5461.57...|
|039 - EXTRACRANIA...|[10055,FLOWERS HO...|    AL - Dothan|[39607.28,5356.28...|
|039 - EXTRACRANIA...|[10056,ST VINCENT...|AL - Birmingham|[22862.23,5374.65...|
|039 - EXTRACRANIA...|[10078,NORTHEAST ...|AL - Birmingham|[31110.85,5366.23...|
|039 - EXTRACRANIA...|[10083,SOUTH BALD...|    AL - Mobile|[25411.33,5282.93...|
|039 - EXTRACRANIA...|[10085,DECATUR GE...|AL - Huntsville|[9234.51,5676.55,...|
|039 - EXTRACRANIA...|[10090,PROVIDENCE...|    AL - Mobile|[15895.85,5930.11...|
|039 - EXTRACRANIA...|[10092,D C H REGI...|AL - Tuscaloosa|[19721.16,6192.54...|
|039 - EXTRACRANIA...|[10100,THOMAS HOS...|    AL - Mobile|[10710.88,4968.0,...|
|039 - EXTRACRANIA...|[10103,BAPTIST ME...|AL - Birmingham|[51343.75,5996.0,...|
+--------------------+--------------------+---------------+--------------------+
only showing top 20 rows
```

## Paso 5: Escribir los datos en Apache Parquet
<a name="aws-glue-programming-python-samples-medicaid-writing"></a>

AWS Glue le facilita la tarea de escribir los datos en un formato como Apache Parquet que las bases de datos relacionales pueden utilizar de manera eficaz:

```
glueContext.write_dynamic_frame.from_options(
       frame = medicare_nest_dyf,
       connection_type = "s3",
       connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"},
       format = "parquet")
```