Saturday

Real time data processing with Cassandra, Part 2

The last few months I was busy with our new telecommunication project to develop MNP (Mobile number portability) for the Russian Federation. Now in Russia anybody can change their telephone operator without changing the number, it's a another history for another blog. Today I have found a few hours to keep my promise. In this blog, I will try to describe how to configure and manage spark pseudo cluster with shark for real time data processing. In the previous blog I will show how to use hive with Hadoop to process data from Cassandra. For whom, who doesn't familiar, Spark is execution engines that supports cyclic data flow and in-memory computing, in the otherhand Shark is an open source distributed SQL query engine for Hadoop data. It brings state-of-the-art performance and advanced analytics to Hive users. I am going to use following open source projects to configure and run the spark + shark cluster :
1) Scala-2.10.3
2) Spark-0.9.0-incubating-bin-hadoop1
3) Shark-0.9.0
4) Hive-0.11.0-bin-shark
6) Jdk 1.7
7) Hadoop-1.2.1
8) Cassandra-1.2.7
9) Cash (Hive Cassandra handler)

I have used my old Hadoop cluster from the previous post, for configure Hadoop cluster, please check my post link above. First, we have to download and install Scala-2.10.3 locally.

Setup Spark:

1) Unzip the downloaded bundle and add the SCALA_HOME to your environment variables. We will use Scala to build Spark and Shark.
2) Download Spark and unzip in any preferable folder.
3) From the home directory of Spark, run sbt/sbt assemble, you can go for coffee break. It will take more than 10 minutes to compile and build. If something goes wrong with build please
4) copy and rename the $SPARK_HOME/conf/spark-env.sh.templete file to $SPARK_HOME/conf/spark-env.sh by command cp spark-env.sh.templete spark-env.sh
5) add JAVA_HOME variable to spark-env.sh export JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/1.7/Home
6) chmod +x spark-env.sh
7) Run spark master by $SPARK_HOME/sbin/start-master.sh
In the out put console you should found the following lines:
starting org.apache.spark.deploy.master.Master, logging to /Users/samim/Development/NoSQL/spark/spark-0.9.0-incubating-bin-hadoop1/sbin/../logs/spark-samim-org.apache.spark.deploy.master.Master-1-Shamim-2.local.out
With cat command you will get the following informations:
14/04/05 16:01:08 INFO Remoting: Starting remoting
14/04/05 16:01:09 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkMaster@Shamim-2.local:7077]
14/04/05 16:01:09 INFO Master: Starting Spark master at spark://Shamim-2.local:7077
14/04/05 16:01:09 INFO MasterWebUI: Started Master web UI at http://192.168.1.46:8080
14/04/05 16:01:09 INFO Master: I have been elected leader! New state: ALIVE
14/04/05 16:01:17 INFO Master: Registering worker Shamim-2.local:1617 with 2 cores, 3.0 GB RAM
In my case http://192.168.1.46:8080 will be web UI and the spark://Shamim-2.local:7077 will be the Spark master uri.
8) Now start the worker slave
$SPARK_HOME/sbin/start-slaves.sh
In the console you should find the log file location as follows:
localhost: starting org.apache.spark.deploy.worker.Worker, logging to /Users/samim/Development/NoSQL/spark/spark-0.9.0-incubating-bin-hadoop1/sbin/../logs/spark-samim-org.apache.spark.deploy.worker.Worker-1-Shamim-2.local.out
Point you browser to http://192.168.1.46:8080 and you should find the following page.


Now Spark is ready for use.
Setup Shark:
1) Download Hive from here.
2) Unzip hive
3) Download Cash (Hive Cassandra handler)
4) Unzip and build with maven. mvn clean package.
5) Copy target/*jar and target/dependency/cassandra-*.jar to $HIVE_HOME/lib
6) Download Shark from github
7) Unzip the archive and run $SHARK_HOME/sbt/sbt package
8) Copy and rename $SHARK_HOME/conf/shark-env.sh.template to $SHARK_HOME/conf/shark-env
9) chmod +x $SHARK_HOME/conf/shark-env.sh
10) Add follwing informations to the shark-env.sh
export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.7.0_09.jdk/Contents/Home
export SPARK_MEM=1g
# (Required) Set the master program's memory
export SHARK_MASTER_MEM=1g
# (Required) Point to your Scala installation.
export SCALA_HOME="/Users/samim/Development/scala/scala-2.10.3"
# (Required) Point to the patched Hive binary distribution
export HIVE_HOME="/Users/samim/Development/NoSQL/hive/hive-0.11.0-bin-shark-0.9.0"
# (Optional) Specify the location of Hive's configuration directory. By default,
# it points to $HIVE_HOME/conf
#export HIVE_CONF_DIR="$HIVE_HOME/conf"
# For running Shark in distributed mode, set the following:
#export HADOOP_HOME=""
export SPARK_HOME="/Users/samim/Development/NoSQL/spark/spark-0.9.0"
export MASTER="spark://Shamim-2.local:7077"

You don't need to add the Hadoop_home but hadoop data node will be need for Hive.
Now run shark with follwing commands:
$SHARK_HOME/bin/shark
If every thing goes well you will get the shark prompt
Logging initialized using configuration in jar:file:/Users/samim/Development/NoSQL/shark/shark-0.9.0/lib_managed/jars/edu.berkeley.cs.shark/hive-common/hive-common-0.11.0-shark.jar!/hive-log4j.properties
Hive history file=/tmp/samim/hive_job_log_samim_1966@Shamim-2.local_201404051958_1374334454.txt
2014-04-05 19:58:36.917 java[1966:1903] Unable to load realm info from SCDynamicStore
8.912: [GC 279616K->14731K(1013632K), 0.0679980 secs]
9.869: [Full GC 77899K->8724K(1013632K), 0.1434890 secs]
Reloading cached RDDs from previous Shark sessions... (use -skipRddReload flag to skip reloading)
10.806: [Full GC 86744K->13497K(1013632K), 0.1476060 secs]
12.345: [Full GC 85586K->19626K(1013632K), 0.2058780 secs]
13.053: [Full GC 47205K->9867K(1013632K), 0.2303530 secs]
13.287: [Full GC 14620K->9906K(1013632K), 0.1734460 secs]
18.762: [Full GC 85934K->16264K(1013632K), 0.2141430 secs]
25.049: [Full GC 237505K->22715K(1013632K), 0.3878240 secs]

30.287: [Full GC 83618K->19037K(1013632K), 0.3781930 secs]

shark>

Now lets play with Shark
1) We assume that Cassnadra node is up and runing (see part 1)
2) Create data base test in Shark
shark> create database test;
3) Create External table in Shark
shark> CREATE EXTERNAL TABLE test.pokes(foo int, bar string)
    STORED BY 'org.apache.hadoop.hive.cassandra.cql.CqlStorageHandler'
    WITH SERDEPROPERTIES ("cql.primarykey" = "foo", "comment"="check", "read_repair_chance" = "0.2",
    "dclocal_read_repair_chance" = "0.14", "gc_grace_seconds" = "989898", "bloom_filter_fp_chance" = "0.2",
    "compaction" = "{'class' : 'LeveledCompactionStrategy'}", "replicate_on_write" = "false", "caching" = "all");

4) run some command
shark> select count(*) from pokes;
185.022: [Full GC 106345K->24340K(1013632K), 0.2817830 secs]
189.722: [Full GC 228461K->30033K(1013632K), 0.3228080 secs]
OK
427
Time taken: 16.106 seconds
5) Now create cache table for pokes:
shark> CREATE TABLE pokes_cache TBLPROPERTIES ("shark.cache" = "true") AS SELECT * FROM test.pokes;
6) Run query
shark> select count(*) from pokes_cache;
OK
427
Time taken: 2.158 seconds
Only 2 seconds.
From the spark master web UI you can anyalize the stage running for every query
All the credit goes for Spark, Shark teams also to brian Oneil for this encourging blog.

References:
1) Shark on Cassandra (w/ Cash) : Interrogating cached data from C* using HiveQL
2) Spark
3) Building Shark from Source Code