DataFrames provide a new api for manipulating data within Spark. These provide a more user friendly experience than pure scala for common queries. The Spark Cassandra Connector provides an integrated DataSource to make creating Cassandra DataFrames easy.
Spark Docs: Data Sources Data Frames
DataSources in Spark take a map of Options which define how the source should act. The Connector provides a CassandraSource which recognizes the following Key Value pairs. Those followed with a default of N/A are required, all others are optional.
Option Key | Controls | Values | Default |
---|---|---|---|
table | The Cassandra Table to connect to | String | N/A |
keyspace | The Keyspace where table is looked for | String | N/A |
cluster | The group of the Cluster Level Settings to inherit | String | "default" |
pushdown | Enables pushing down predicates to C* when applicable | (true,false) | true |
####Read, Writing and CassandraConnector Options
Any normal Spark Connector configuration options for Connecting, Reading or Writing
can be passed through as DataFrame options as well. When using the read
command below these
options should appear exactly the same as when set in the SparkConf.
####Setting Cluster and Keyspace Level Options The connector also provides a way to describe the options which should be applied to all DataFrames within a cluster or within a keyspace. When a property has been specified at the table level it will override the default keyspace or cluster property.
To add these properties add keys to your SparkConf
in the format
clusterName:keyspaceName/propertyName.
Example Changing Cluster/Keyspace Level Properties
val conf = new SparkConf()
.set("ClusterOne/spark.cassandra.input.split.size_in_mb","32")
.set("default:test/spark.cassandra.input.split.size_in_mb","128")
...
val df = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "words", "keyspace" -> "test"))
.load()// This DataFrame will use a spark.cassandra.input.size of 32
val otherdf = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "words", "keyspace" -> "test" , "cluster" -> "ClusterOne"))
.load()// This DataFrame will use a spark.cassandra.input.size of 128
val lastdf = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map(
"table" -> "words",
"keyspace" -> "test" ,
"cluster" -> "ClusterOne",
"spark.cassandra.input.split.size_in_mb" -> 48
)
).load()// This DataFrame will use a spark.cassandra.input.split.size of 48
###Creating DataFrames using Read Commands
The most programmatic way to create a data frame is to invoke a read
command on the SQLContext. This
will build a DataFrameReader
. Specify format
as org.apache.spark.sql.cassandra
.
You can then use options
to give a map of Map[String,String]
of options as described above.
Then finish by calling load
to actually get a DataFrame
.
Example Creating a DataFrame using a Read Command
val df = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "words", "keyspace" -> "test" ))
.load()
df.show
//word count
//cat 30
//fox 40
###Creating DataFrames using Spark SQL
Accessing data Frames using Spark SQL involves creating temporary tables and specifying the
source as org.apache.spark.sql.cassandra
. The OPTIONS
passed to this table are used to
establish a relation between the CassandraTable and the internally used DataSource.
Because of a limitation in SparkSQL, SparkSQL OPTIONS
must have their
.
characters replaced with _
. This means spark.cassandra.input.split.size_in_mb
becomes
spark_cassandra_input_split_size_in_mb
.
Example Creating a Source Using Spark SQL:
//Create Relation with the cassandra table test.words
scala> sqlContext.sql(
"""CREATE TEMPORARY TABLE words
|USING org.apache.spark.sql.cassandra
|OPTIONS (
| table "words",
| keyspace "test",
| cluster "Test Cluster",
| pushdown "true"
|)""".stripMargin)
scala> val df = sqlContext.sql("SELECT * FROM words")
scala> df.show()
//word count
//cat 30
//fox 40
scala> df.filter(df("count") > 30).show
//word count
//fox 40
In addition you can use Spark SQL on the registered tables:
sqlContext.sql("SELECT * FROM words WHERE word = 'fox'").collect
//Array[org.apache.spark.sql.Row] = Array([fox,40])
###Persisting a DataFrame to Cassandra Using the Save Command DataFrames provide a save function which allows them to persist their data to another DataSource. The connector supports using this feature to persist a DataFrame a Cassandra Table.
Example Copying Between Two Tables Using DataFrames
val df = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "words", "keyspace" -> "test" ))
.load()
df.write
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "words_copy", "keyspace" -> "test"))
.save()
###Pushing down clauses to Cassandra The dataframe api will automatically pushdown valid where clauses to Cassandra as long as the pushdown option is enabled (defaults to enabled.)
Example Table
CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
use test;
CREATE table words ( user text, word text, count int , PRIMARY KEY (user,word));
INSERT INTO words (user, word, count ) VALUES ( 'Russ', 'dino', 10 );
INSERT INTO words (user, word, count ) VALUES ( 'Russ', 'fad', 5 );
INSERT INTO words (user, word, count ) VALUES ( 'Sam', 'alpha', 3 );
INSERT INTO words (user, word, count ) VALUES ( 'Zebra', 'zed', 100 );
First we can create a DataFrame and see that it has no pushdown filters
set in the log. This
means all requests will go directly to C* and we will require reading all of the data to show
this DataFrame.
val df = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "words", "keyspace" -> "test"))
.load
df.explain
//15/07/06 09:21:21 INFO CassandraSourceRelation: filters:
//15/07/06 09:21:21 INFO CassandraSourceRelation: pushdown filters: //ArrayBuffer()
//== Physical Plan ==
//PhysicalRDD [user#0,word#1,count#2], MapPartitionsRDD[2] at explain //at <console>:22
df.show
//...
//15/07/06 09:26:03 INFO CassandraSourceRelation: filters:
//15/07/06 09:26:03 INFO CassandraSourceRelation: pushdown filters: //ArrayBuffer()
//
//+-----+-----+-----+
//| user| word|count|
//+-----+-----+-----+
//|Zebra| zed| 100|
//| Russ| dino| 10|
//| Russ| fad| 5|
//| Sam|alpha| 3|
//+-----+-----+-----+
The example schema has a clustering key of "word" so we can pushdown filters on that column to C*. We
do this by applying a normal DataFrame filter. The connector will automatically determine that the
filter can be pushed down and will add it to pushdown filters
. All of the elements of
pushdown filters
will be automatically added to the CQL requests made to C* for the
data from this table. The subsequent call will then only serialize data from C* which passes the filter,
reducing the load on C*.
val dfWithPushdown = df.filter(df("word") > "ham")
dfWithPushdown.explain
//15/07/06 09:29:10 INFO CassandraSourceRelation: filters: GreaterThan(word,ham)
//15/07/06 09:29:10 INFO CassandraSourceRelation: pushdown filters: ArrayBuffer(GreaterThan(word,ham))
== Physical Plan ==
Filter (word#1 > ham)
PhysicalRDD [user#0,word#1,count#2], MapPartitionsRDD[18] at explain at <console>:24
dfWithPushdown.show
15/07/06 09:30:48 INFO CassandraSourceRelation: filters: GreaterThan(word,ham)
15/07/06 09:30:48 INFO CassandraSourceRelation: pushdown filters: ArrayBuffer(GreaterThan(word,ham))
+-----+----+-----+
| user|word|count|
+-----+----+-----+
|Zebra| zed| 100|
+-----+----+-----+
####Pushdown Filter Examples Example table
create keyspace if not exists pushdowns WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 };
use pushdowns;
CREATE TABLE pushdownexample
(
partitionkey1 bigint,
partitionkey2 bigint,
partitionkey3 bigint,
clusterkey1 bigint,
clusterkey2 bigint,
clusterkey3 bigint,
regularcolumn bigint,
PRIMARY KEY ((partitionkey1, partitionkey2, partitionkey3), clusterkey1, clusterkey2, clusterkey3)
);
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val df = sqlContext.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "pushdownexample", "keyspace" -> "pushdowns" )).load()
To push down partition keys, all of them must be included, but not more than one predicate per partition key, otherwise nothing is pushed down.
df.filter("partitionkey1 = 1 AND partitionkey2 = 1 AND partitionkey3 = 1").show()
// INFO 2015-08-26 00:37:40 org.apache.spark.sql.cassandra.CassandraSourceRelation: filters: EqualTo(partitionkey1,1), EqualTo(partitionkey2,1), EqualTo(partitionkey3,1)
// INFO 2015-08-26 00:37:40 org.apache.spark.sql.cassandra.CassandraSourceRelation: pushdown filters: ArrayBuffer(EqualTo(partitionkey1,1), EqualTo(partitionkey2,1), EqualTo(partitionkey3,1))
One partition key left out:
df.filter("partitionkey1 = 1 AND partitionkey2 = 1").show()
// INFO 2015-08-26 00:53:07 org.apache.spark.sql.cassandra.CassandraSourceRelation: filters: EqualTo(partitionkey1,1), EqualTo(partitionkey2,1)
// INFO 2015-08-26 00:53:07 org.apache.spark.sql.cassandra.CassandraSourceRelation: pushdown filters: ArrayBuffer()
More than one predicate for partitionkey3
:
df.filter("partitionkey1 = 1 AND partitionkey2 = 1 AND partitionkey3 > 0 AND partitionkey3 < 5").show()
// INFO 2015-08-26 00:54:03 org.apache.spark.sql.cassandra.CassandraSourceRelation: filters: EqualTo(partitionkey1,1), EqualTo(partitionkey2,1), GreaterThan(partitionkey3,0), LessThan(partitionkey3,5)
// INFO 2015-08-26 00:54:03 org.apache.spark.sql.cassandra.CassandraSourceRelation: pushdown filters: ArrayBuffer()
Clustering keys are more relaxed. But only the last predicate can be non-EQ, and if there is more than one predicate for a column, they must not be EQ or IN, otherwise only some predicates may be pushed down.
df.filter("clusterkey1 = 1 AND clusterkey2 > 0 AND clusterkey2 < 10").show()
// INFO 2015-08-26 01:01:02 org.apache.spark.sql.cassandra.CassandraSourceRelation: filters: EqualTo(clusterkey1,1), GreaterThan(clusterkey2,0), LessThan(clusterkey2,10)
// INFO 2015-08-26 01:01:02 org.apache.spark.sql.cassandra.CassandraSourceRelation: pushdown filters: ArrayBuffer(EqualTo(clusterkey1,1), GreaterThan(clusterkey2,0), LessThan(clusterkey2,10))
First predicate not EQ:
df.filter("clusterkey1 > 1 AND clusterkey2 > 1").show()
// INFO 2015-08-26 00:55:01 org.apache.spark.sql.cassandra.CassandraSourceRelation: filters: GreaterThan(clusterkey1,1), GreaterThan(clusterkey2,1)
// INFO 2015-08-26 00:55:01 org.apache.spark.sql.cassandra.CassandraSourceRelation: pushdown filters: ArrayBuffer(GreaterThan(clusterkey1,1))
clusterkey2
EQ predicate:
df.filter("clusterkey1 = 1 AND clusterkey2 = 1 AND clusterkey2 < 10").show()
// INFO 2015-08-26 00:56:37 org.apache.spark.sql.cassandra.CassandraSourceRelation: filters: EqualTo(clusterkey1,1), EqualTo(clusterkey2,1), LessThan(clusterkey2,10)
// INFO 2015-08-26 00:56:37 org.apache.spark.sql.cassandra.CassandraSourceRelation: pushdown filters: ArrayBuffer(EqualTo(clusterkey1,1), EqualTo(clusterkey2,1))