

# Tutorial: Integrate with Apache Spark to import or export data
<a name="spark-integrating"></a>

Apache Spark is an open-source engine for large-scale data analytics. Apache Spark enables you to perform analytics on data stored in Amazon Keyspaces more efficiently. You can also use Amazon Keyspaces to provide applications with consistent, single-digit-millisecond read access to analytics data from Spark. The open-source Spark Cassandra Connector simplifies reading and writing data between Amazon Keyspaces and Spark. 

Amazon Keyspaces support for the Spark Cassandra Connector streamlines running Cassandra workloads in Spark-based analytics pipelines by using a fully managed and serverless database service. With Amazon Keyspaces, you don’t need to worry about Spark competing for the same underlying infrastructure resources as your tables. Amazon Keyspaces tables scale up and down automatically based on your application traffic.

The following tutorial walks you through steps and best practices required to read and write data to Amazon Keyspaces using the Spark Cassandra Connector. The tutorial demonstrates how to migrate data to Amazon Keyspaces by loading data from a file with the Spark Cassandra Connector and writing it to an Amazon Keyspaces table. Then, the tutorial shows how to read the data back from Amazon Keyspaces using the Spark Cassandra Connector. You would do this to run Cassandra workloads in Spark-based analytics pipelines. 

**Topics**
+ [Prerequisites for establishing connections to Amazon Keyspaces with the Spark Cassandra Connector](spark-tutorial-prerequisites.md)
+ [Step 1: Configure Amazon Keyspaces for integration with the Apache Cassandra Spark Connector](spark-tutorial-step1.md)
+ [Step 2: Configure the Apache Cassandra Spark Connector](spark-tutorial-step2.md)
+ [Step 3: Create the application configuration file](spark-tutorial-step3.md)
+ [Step 4: Prepare the source data and the target table in Amazon Keyspaces](spark-tutorial-step4.md)
+ [Step 5: Write and read Amazon Keyspaces data using the Apache Cassandra Spark Connector](spark-tutorial-step5.md)
+ [Troubleshooting common errors when using the Spark Cassandra Connector with Amazon Keyspaces](spark-tutorial-step6.md)

# Prerequisites for establishing connections to Amazon Keyspaces with the Spark Cassandra Connector
<a name="spark-tutorial-prerequisites"></a>

Before you connect to Amazon Keyspaces with the Spark Cassandra Connector, you need to make sure that you've installed the following. The compatibility of Amazon Keyspaces with the Spark Cassandra Connector has been tested with the following recommended versions:
+ Java version 8
+ Scala 2.12
+ Spark 3.4
+ Cassandra Connector 2.5 and higher
+ Cassandra driver 4.12

1. To install Scala, follow the instructions at [https://www.scala-lang.org/download/scala2.html](https://www.scala-lang.org/download/scala2.html).

1. To install Spark 3.4.1, follow this example.

   ```
   curl -o spark-3.4.1-bin-hadoop3.tgz -k https://dlcdn.apache.org/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
   
   # now to untar
   tar -zxvf spark-3.4.1-bin-hadoop3.tgz
   
   # set this variable.
   export SPARK_HOME=$PWD/spark-3.4.1-bin-hadoop3
   ```
   ```

# Step 1: Configure Amazon Keyspaces for integration with the Apache Cassandra Spark Connector
<a name="spark-tutorial-step1"></a>

In this step, you confirm that the partitioner for your account is compatible with the Apache Spark Connector and setup the required IAM permissions. The following best practices help you to provision sufficient read/write capacity for the table.

1. Confirm that the `Murmur3Partitioner` partitioner is the default partitioner for your account. This partitioner is compatible with the Spark Cassandra Connector. For more information on partitioners and how to change them, see [Working with partitioners in Amazon Keyspaces](working-with-partitioners.md).

1. Setup your IAM permissions for Amazon Keyspaces, using interface VPC endpoints, with Apache Spark.
   + Assign read/write access to the user table and read access to the system tables as shown in the IAM policy example listed below.
   + Populating the system.peers table with your available interface VPC endpoints is required for clients accessing Amazon Keyspaces with Spark over [VPC endpoints](https://docs.aws.amazon.com/keyspaces/latest/devguide/vpc-endpoints.html).

   ```
   {
      "Version":"2012-10-17",		 	 	 
      "Statement":[
         {
            "Effect":"Allow",
            "Action":[
               "cassandra:Select",
               "cassandra:Modify"
            ],
            "Resource":[
               "arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable",
               "arn:aws:cassandra:us-east-1:111122223333:/keyspace/system*"
            ]
         },
         {
            "Sid":"ListVPCEndpoints",
            "Effect":"Allow",
            "Action":[
               "ec2:DescribeNetworkInterfaces",
               "ec2:DescribeVpcEndpoints"
            ],
            "Resource":"*"
         }
      ]
   }
   ```

1. Consider the following best practices to configure sufficient read/write throughput capacity for your Amazon Keyspaces table to support the traffic from the Spark Cassandra Connector. 
   + Start using on-demand capacity to help you test the scenario.
   + To optimize the cost of table throughput for production environments, use a rate limiter for traffic from the connector, and configure your table to use provisioned capacity with automatic scaling. For more information, see [Manage throughput capacity automatically with Amazon Keyspaces auto scaling](autoscaling.md).
   + You can use a fixed rate limiter that comes with the Cassandra driver. There are some [rate limiters tailored to Amazon Keyspaces](https://github.com/aws-samples/amazon-keyspaces-java-driver-helpers) in the [AWS samples](https://github.com/aws-samples) repo.
   + For more information about capacity management, see [Configure read/write capacity modes in Amazon Keyspaces](ReadWriteCapacityMode.md).

# Step 2: Configure the Apache Cassandra Spark Connector
<a name="spark-tutorial-step2"></a>

Apache Spark is a general-purpose compute platform that you can configure in different ways. To configure Spark and the Spark Cassandra Connector for integration with Amazon Keyspaces, we recommend that you start with the minimum configuration settings described in the following section, and then increase them later as appropriate for your workload.
+ **Create Spark partition sizes smaller than 8 MBs.**

  In Spark, *partitions* represent an atomic chunk of data that can be run in parallel. When you are writing data to Amazon Keyspaces with the Spark Cassandra Connector, the smaller the Spark partition, the smaller the amount of records that the task is going to write. If a Spark task encounters multiple errors, it fails after the designated number of retries has been exhausted. To avoid replaying large tasks and reprocessing a lot of data, keep the size of the Spark partition small. 
+ **Use a low concurrent number of writes per executor with a large number of retries.**

  Amazon Keyspaces returns insufficient capacity errors back to Cassandra drivers as operation timeouts. You can't address timeouts caused by insufficient capacity by changing the configured timeout duration because the Spark Cassandra Connector attempts to retry requests transparently using the `MultipleRetryPolicy`. To ensure that retries don’t overwhelm the driver’s connection pool, use a low concurrent number of writes per executor with a large number of retries. The following code snippet is an example of this.

  ```
  spark.cassandra.query.retry.count = 500
  spark.cassandra.output.concurrent.writes = 3
  ```
+ **Break down the total throughput and distribute it across multiple Cassandra sessions.**
  + The Cassandra Spark Connector creates one session for each Spark executor. Think about this session as the unit of scale to determine the required throughput and the number of connections required.
  + When defining the number of cores per executor and the number of cores per task, start low and increase as needed.
  + Set Spark task failures to allow processing in the event of transient errors. After you become familiar with your application's traffic characteristics and requirements, we recommend setting `spark.task.maxFailures` to a bounded value.
  + For example, the following configuration can handle two concurrent tasks per executor, per session:

    ```
    spark.executor.instances = configurable -> number of executors for the session.
    spark.executor.cores = 2 -> Number of cores per executor.
    spark.task.cpus = 1 -> Number of cores per task.
    spark.task.maxFailures = -1
    ```
+ **Turn off batching.**
  +  We recommend that you turn off batching to improve random access patterns. The following code snippet is an example of this.

    ```
    spark.cassandra.output.batch.size.rows = 1 (Default = None)
    spark.cassandra.output.batch.grouping.key = none (Default = Partition)
    spark.cassandra.output.batch.grouping.buffer.size = 100 (Default = 1000)
    ```
+ **Set `SPARK_LOCAL_DIRS` to a fast, local disk with enough space.**
  + By default, Spark saves map output files and resilient distributed datasets (RDDs) to a `/tmp `folder. Depending on your Spark host’s configuration, this can result in *no space left on the device* style errors. 
  + To set the `SPARK_LOCAL_DIRS` environment variable to a directory called `/example/spark-dir`, you can use the following command. 

    ```
    export SPARK_LOCAL_DIRS=/example/spark-dir
    ```

# Step 3: Create the application configuration file
<a name="spark-tutorial-step3"></a>

To use the open-source Spark Cassandra Connector with Amazon Keyspaces, you need to provide an application configuration file that contains the settings required to connect with the DataStax Java driver. You can use either service-specific credentials or the SigV4 plugin to connect.

If you haven't already done so, you need to convert the digital certificate used to create the TLS connection into a trustStore file. You can follow the detailed steps at [Before you begin](using_java_driver.md#using_java_driver.BeforeYouBegin) from the Java driver connection tutorial. Take note of the trustStore file path and password because you need this information when you create the application config file.

## Connect with SigV4 authentication
<a name="appconfig.sigv4"></a>

This section shows you an example `application.conf` file that you can use when connecting with AWS credentials and the SigV4 plugin. If you haven't already done so, you need to generate your IAM access keys (an access key ID and a secret access key) and save them in your AWS config file or as environment variables. For detailed instructions, see [Credentials required by the AWS CLI, the AWS SDK, or the Amazon Keyspaces SigV4 plugin for Cassandra client drivers](SigV4_credentials.md).

In the following example, replace the file path to your trustStore file, and replace the password.

```
datastax-java-driver {
        basic.contact-points = ["cassandra.us-east-1.amazonaws.com:9142"]
        basic.load-balancing-policy {
            class = DefaultLoadBalancingPolicy
            local-datacenter = us-east-1
            slow-replica-avoidance = false
        }
        basic.request {
              consistency = LOCAL_QUORUM
        }
        advanced {
                auth-provider = {
                   class = software.aws.mcs.auth.SigV4AuthProvider
                   aws-region = us-east-1
                 }
            ssl-engine-factory {
                class = DefaultSslEngineFactory
                truststore-path = "path_to_file/cassandra_truststore.jks"
                truststore-password = "password"
        hostname-validation=false
            }
   }
        advanced.connection.pool.local.size = 3   
}
```

Update and save this configuration file as `/home/user1/application.conf`. The following examples use this path.

## Connect with service-specific credentials
<a name="appconfig.ssc"></a>

 This section shows you an example `application.conf` file that you can use when connecting with service-specific credentials. If you haven't already done so, you need to generate service-specific credentials for Amazon Keyspaces. For detailed instructions, see [Create service-specific credentials for programmatic access to Amazon Keyspaces](programmatic.credentials.ssc.md).

In the following example, replace `username` and `password` with your own credentials. Also, replace the file path to your trustStore file, and replace the password.

```
datastax-java-driver {
        basic.contact-points = ["cassandra.us-east-1.amazonaws.com:9142"]
        basic.load-balancing-policy {
            class = DefaultLoadBalancingPolicy
            local-datacenter = us-east-1
        }
        basic.request {
              consistency = LOCAL_QUORUM
        }
        advanced {
            auth-provider = {
            class = PlainTextAuthProvider
                    username = "username"
                    password = "password"
                    aws-region = "us-east-1"
            }
            ssl-engine-factory {
                class = DefaultSslEngineFactory
                truststore-path = "path_to_file/cassandra_truststore.jks"
                truststore-password = "password"
                hostname-validation=false
            }
            metadata = {
                schema {
                     token-map.enabled = true
                }
            }
        }    
}
```

Update and save this configuration file as `/home/user1/application.conf` to use with the code example.

## Connect with a fixed rate
<a name="appconfig.fixedrate"></a>

To force a fixed rate per Spark executor, you can define a request throttler. This request throttler limits the rate of requests per second. The Spark Cassandra Connector deploys a Cassandra session per executor. Using the following formula can help you achieve consistent throughput against a table. 

```
max-request-per-second * numberOfExecutors = total throughput against a table
```

You can add this example to the application config file that you created earlier.

```
datastax-java-driver {
  advanced.throttler {
    class = RateLimitingRequestThrottler

    max-requests-per-second = 3000
    max-queue-size = 30000
    drain-interval = 1 millisecond
  }
}
```

# Step 4: Prepare the source data and the target table in Amazon Keyspaces
<a name="spark-tutorial-step4"></a>

In this step, you create a source file with sample data and an Amazon Keyspaces table.

1. Create the source file. You can choose one of the following options:
   + For this tutorial, you use a comma-separated values (CSV) file with the name `keyspaces_sample_table.csv` as the source file for the data migration. The provided sample file contains a few rows of data for a table with the name `book_awards`.

     1. Download the sample CSV file (`keyspaces_sample_table.csv`) that is contained in the following archive file [samplemigration.zip](samples/samplemigration.zip). Unzip the archive and take note of the path to `keyspaces_sample_table.csv`.
   + If you want to follow along with your own CSV file to write data to Amazon Keyspaces, make sure that the data is randomized. Data that is read directly from a database or exported to flat files is typically ordered by the partition and primary key. Importing ordered data to Amazon Keyspaces can cause it to be written to smaller segments of Amazon Keyspaces partitions, which results in an uneven traffic distribution. This can lead to slower performance and higher error rates. 

     In contrast, randomizing data helps to take advantage of the built-in load balancing capabilities of Amazon Keyspaces by distributing traffic across partitions more evenly. There are various tools that you can use for randomizing data. For an example that uses the open-source tool [Shuf](https://en.wikipedia.org/wiki/Shuf), see [Step 2: Prepare the data to upload using DSBulk](dsbulk-upload-prepare-data.md) in the data migration tutorial. The following is an example that shows how to shuffle data as a `DataFrame`. 

     ```
     import org.apache.spark.sql.functions.randval
     shuffledDF = dataframe.orderBy(rand())
     ```

1. Create the target keyspace and table in Amazon Keyspaces.

   1. Connect to Amazon Keyspaces using the `cqlsh-expansion`. For `cqlsh-expansion` installation instructions, see [Using the `cqlsh-expansion` to connect to Amazon Keyspaces](programmatic.cqlsh.md#using_cqlsh). 

      Replace the service endpoint in the following example with your own value.

      ```
      cqlsh-expansion cassandra.us-east-1.amazonaws.com 9142 --ssl
      ```

   1. Create a new keyspace with the name `catalog` as shown in the following example. 

      ```
      CREATE KEYSPACE catalog WITH REPLICATION = {'class': 'SingleRegionStrategy'};
      ```

   1. After the new keyspace has a status of available, use the following code to create the target table `book_awards`. To learn more about asynchronous resource creation and how to check if a resource is available, see [Check keyspace creation status in Amazon Keyspaces](keyspaces-create.md).

      ```
      CREATE TABLE catalog.book_awards (
         year int,
         award text,
         rank int, 
         category text,
         book_title text,
         author text, 
         publisher text,
         PRIMARY KEY ((year, award), category, rank)
         );
      ```

# Step 5: Write and read Amazon Keyspaces data using the Apache Cassandra Spark Connector
<a name="spark-tutorial-step5"></a>

In this step, you start by loading the data from the sample file into a `DataFrame` with the Spark Cassandra Connector. Next, you write the data from the `DataFrame` into your Amazon Keyspaces table. You can also use this part independently, for example, to migrate data into an Amazon Keyspaces table. Finally, you read the data from your table into a `DataFrame` using the Spark Cassandra Connector. You can also use this part independently, for example, to read data from an Amazon Keyspaces table to perform data analytics with Apache Spark.

1. Start the Spark Shell as shown in the following example. Note that this example is using SigV4 authentication.

   ```
   ./spark-shell --files application.conf --conf spark.cassandra.connection.config.profile.path=application.conf --packages software.aws.mcs:aws-sigv4-auth-cassandra-java-driver-plugin:4.0.5,com.datastax.spark:spark-cassandra-connector_2.12:3.1.0 --conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions
   ```

1. Import the Spark Cassandra Connector with the following code.

   ```
   import org.apache.spark.sql.cassandra._
   ```

1. To read data from the CSV file and store it in a `DataFrame`, you can use the following code example.

   ```
   var df = spark.read.option("header","true").option("inferSchema","true").csv("keyspaces_sample_table.csv")
   ```

   You can display the result with the following command.

   ```
   scala> df.show();
   ```

   The output should look similar to this.

   ```
   +----------------+----+-----------+----+------------------+--------------------+-------------+
   |           award|year|   category|rank|            author|          book_title|    publisher|
   +----------------+----+-----------+----+------------------+--------------------+-------------+
   |Kwesi Manu Prize|2020|    Fiction|   1|        Akua Mansa|   Where did you go?|SomePublisher|
   |Kwesi Manu Prize|2020|    Fiction|   2|       John Stiles|           Yesterday|Example Books|
   |Kwesi Manu Prize|2020|    Fiction|   3|        Nikki Wolf|Moving to the Cha...| AnyPublisher|
   |            Wolf|2020|Non-Fiction|   1|       Wang Xiulan|    History of Ideas|Example Books|
   |            Wolf|2020|Non-Fiction|   2|Ana Carolina Silva|       Science Today|SomePublisher|
   |            Wolf|2020|Non-Fiction|   3| Shirley Rodriguez|The Future of Sea...| AnyPublisher|
   |     Richard Roe|2020|    Fiction|   1| Alejandro Rosalez|         Long Summer|SomePublisher|
   |     Richard Roe|2020|    Fiction|   2|       Arnav Desai|             The Key|Example Books|
   |     Richard Roe|2020|    Fiction|   3|     Mateo Jackson|    Inside the Whale| AnyPublisher|
   +----------------+----+-----------+----+------------------+--------------------+-------------+
   ```

   You can confirm the schema of the data in the `DataFrame` as shown in the following example.

   ```
   scala> df.printSchema
   ```

   The output should look like this.

   ```
   root
   |-- award: string (nullable = true)
   |-- year: integer (nullable = true)
   |-- category: string (nullable = true)
   |-- rank: integer (nullable = true)
   |-- author: string (nullable = true)
   |-- book_title: string (nullable = true)
   |-- publisher: string (nullable = true)
   ```

1. Use the following command to write the data in the `DataFrame` to the Amazon Keyspaces table.

   ```
   df.write.cassandraFormat("book_awards", "catalog").mode("APPEND").save()
   ```

1. To confirm that the data was saved, you can read it back to a dataframe, as shown in the following example.

   ```
   var newDf = spark.read.cassandraFormat("book_awards", "catalog").load()
   ```

   Then you can show the data that is now contained in the dataframe.

   ```
   scala> newDf.show()
   ```

   The output of that command should look like this.

   ```
   +--------------------+------------------+----------------+-----------+-------------+----+----+
   |          book_title|            author|           award|   category|    publisher|rank|year|
   +--------------------+------------------+----------------+-----------+-------------+----+----+
   |         Long Summer| Alejandro Rosalez|     Richard Roe|    Fiction|SomePublisher|   1|2020|
   |    History of Ideas|       Wang Xiulan|            Wolf|Non-Fiction|Example Books|   1|2020|
   |   Where did you go?|        Akua Mansa|Kwesi Manu Prize|    Fiction|SomePublisher|   1|2020|
   |    Inside the Whale|     Mateo Jackson|     Richard Roe|    Fiction| AnyPublisher|   3|2020|
   |           Yesterday|       John Stiles|Kwesi Manu Prize|    Fiction|Example Books|   2|2020|
   |Moving to the Cha...|        Nikki Wolf|Kwesi Manu Prize|    Fiction| AnyPublisher|   3|2020|
   |The Future of Sea...| Shirley Rodriguez|            Wolf|Non-Fiction| AnyPublisher|   3|2020|
   |       Science Today|Ana Carolina Silva|            Wolf|Non-Fiction|SomePublisher|   2|2020|
   |             The Key|       Arnav Desai|     Richard Roe|    Fiction|Example Books|   2|2020|
   +--------------------+------------------+----------------+-----------+-------------+----+----+
   ```

# Troubleshooting common errors when using the Spark Cassandra Connector with Amazon Keyspaces
<a name="spark-tutorial-step6"></a>

If you're using Amazon Virtual Private Cloud and you connect to Amazon Keyspaces, the most common errors experienced when using the Spark connector are caused by the following configuration issues.
+ The IAM user or role used in the VPC lacks the required permissions to access the `system.peers` table in Amazon Keyspaces. For more information, see [Populating `system.peers` table entries with interface VPC endpoint information](vpc-endpoints.md#system_peers).
+ The IAM user or role lacks the required read/write permissions to the user table and read access to the system tables in Amazon Keyspaces. For more information, see [Step 1: Configure Amazon Keyspaces for integration with the Apache Cassandra Spark Connector](spark-tutorial-step1.md).
+ The Java driver configuration doesn't disable hostname verification when creating the SSL/TLS connection. For examples, see [Step 2: Configure the driver](using_java_driver.md#java_tutorial.driverconfiguration).

For detailed connection troubleshooting steps, see [My VPC endpoint connection doesn't work properly](troubleshooting.connecting.md#troubleshooting.connection.vpce).

In addition, you can use Amazon CloudWatch metrics to help you troubleshoot issues with your Spark Cassandra Connector configuration in Amazon Keyspaces. To learn more about using Amazon Keyspaces with CloudWatch, see [Monitoring Amazon Keyspaces with Amazon CloudWatch](monitoring-cloudwatch.md). 

The following section describes the most useful metrics to observe when you're using the Spark Cassandra Connector.

**PerConnectionRequestRateExceeded**  
Amazon Keyspaces has a quota of 3,000 requests per second, per connection. Each Spark executor establishes a connection with Amazon Keyspaces. Running multiple retries can exhaust your per-connection request rate quota. If you exceed this quota, Amazon Keyspaces emits a `PerConnectionRequestRateExceeded` metric in CloudWatch.   
If you see PerConnectionRequestRateExceeded events present along with other system or user errors, it's likely that Spark is running multiple retries beyond the allotted number of requests per connection.  
If you see `PerConnectionRequestRateExceeded` events without other errors, then you might need to increase the number of connections in your driver settings to allow for more throughput, or you might need to increase the number of executors in your Spark job.

**StoragePartitionThroughputCapacityExceeded**  
Amazon Keyspaces has a quota of 1,000 WCUs or WRUs per second/3,000 RCUs or RRUs per second, per-partition. If you're seeing `StoragePartitionThroughputCapacityExceeded` CloudWatch events, it could indicate that data is not randomized on load. For examples how to shuffle data, see [Step 4: Prepare the source data and the target table in Amazon Keyspaces](spark-tutorial-step4.md).

## Common errors and warnings
<a name="common_errors_warnings"></a>

If you're using Amazon Virtual Private Cloud and you connect to Amazon Keyspaces, the Cassandra driver might issue a warning message about the control node itself in the `system.peers` table. For more information, see [Common errors and warnings](vpc-endpoints.md#vpc_troubleshooting). You can safely ignore this warning.