Skip to main content

Real time data processing with Cassandra, Part 2

The last few months I was busy with our new telecommunication project to develop MNP (Mobile number portability) for the Russian Federation. Now in Russia anybody can change their telephone operator without changing the number, it's a another history for another blog. Today I have found a few hours to keep my promise. In this blog, I will try to describe how to configure and manage spark pseudo cluster with shark for real time data processing. In the previous blog I will show how to use hive with Hadoop to process data from Cassandra. For whom, who doesn't familiar, Spark is execution engines that supports cyclic data flow and in-memory computing, in the otherhand Shark is an open source distributed SQL query engine for Hadoop data. It brings state-of-the-art performance and advanced analytics to Hive users. I am going to use following open source projects to configure and run the spark + shark cluster :
1) Scala-2.10.3
2) Spark-0.9.0-incubating-bin-hadoop1
3) Shark-0.9.0
4) Hive-0.11.0-bin-shark
6) Jdk 1.7
7) Hadoop-1.2.1
8) Cassandra-1.2.7
9) Cash (Hive Cassandra handler)

I have used my old Hadoop cluster from the previous post, for configure Hadoop cluster, please check my post link above. First, we have to download and install Scala-2.10.3 locally.

Setup Spark:

1) Unzip the downloaded bundle and add the SCALA_HOME to your environment variables. We will use Scala to build Spark and Shark.
2) Download Spark and unzip in any preferable folder.
3) From the home directory of Spark, run sbt/sbt assemble, you can go for coffee break. It will take more than 10 minutes to compile and build. If something goes wrong with build please
4) copy and rename the $SPARK_HOME/conf/spark-env.sh.templete file to $SPARK_HOME/conf/spark-env.sh by command cp spark-env.sh.templete spark-env.sh
5) add JAVA_HOME variable to spark-env.sh export JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/1.7/Home
6) chmod +x spark-env.sh
7) Run spark master by $SPARK_HOME/sbin/start-master.sh
In the out put console you should found the following lines:
starting org.apache.spark.deploy.master.Master, logging to /Users/samim/Development/NoSQL/spark/spark-0.9.0-incubating-bin-hadoop1/sbin/../logs/spark-samim-org.apache.spark.deploy.master.Master-1-Shamim-2.local.out
With cat command you will get the following informations:
14/04/05 16:01:08 INFO Remoting: Starting remoting
14/04/05 16:01:09 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkMaster@Shamim-2.local:7077]
14/04/05 16:01:09 INFO Master: Starting Spark master at spark://Shamim-2.local:7077
14/04/05 16:01:09 INFO MasterWebUI: Started Master web UI at http://192.168.1.46:8080
14/04/05 16:01:09 INFO Master: I have been elected leader! New state: ALIVE
14/04/05 16:01:17 INFO Master: Registering worker Shamim-2.local:1617 with 2 cores, 3.0 GB RAM
In my case http://192.168.1.46:8080 will be web UI and the spark://Shamim-2.local:7077 will be the Spark master uri.
8) Now start the worker slave
$SPARK_HOME/sbin/start-slaves.sh
In the console you should find the log file location as follows:
localhost: starting org.apache.spark.deploy.worker.Worker, logging to /Users/samim/Development/NoSQL/spark/spark-0.9.0-incubating-bin-hadoop1/sbin/../logs/spark-samim-org.apache.spark.deploy.worker.Worker-1-Shamim-2.local.out
Point you browser to http://192.168.1.46:8080 and you should find the following page.


Now Spark is ready for use.
Setup Shark:
1) Download Hive from here.
2) Unzip hive
3) Download Cash (Hive Cassandra handler)
4) Unzip and build with maven. mvn clean package.
5) Copy target/*jar and target/dependency/cassandra-*.jar to $HIVE_HOME/lib
6) Download Shark from github
7) Unzip the archive and run $SHARK_HOME/sbt/sbt package
8) Copy and rename $SHARK_HOME/conf/shark-env.sh.template to $SHARK_HOME/conf/shark-env
9) chmod +x $SHARK_HOME/conf/shark-env.sh
10) Add follwing informations to the shark-env.sh
export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.7.0_09.jdk/Contents/Home
export SPARK_MEM=1g
# (Required) Set the master program's memory
export SHARK_MASTER_MEM=1g
# (Required) Point to your Scala installation.
export SCALA_HOME="/Users/samim/Development/scala/scala-2.10.3"
# (Required) Point to the patched Hive binary distribution
export HIVE_HOME="/Users/samim/Development/NoSQL/hive/hive-0.11.0-bin-shark-0.9.0"
# (Optional) Specify the location of Hive's configuration directory. By default,
# it points to $HIVE_HOME/conf
#export HIVE_CONF_DIR="$HIVE_HOME/conf"
# For running Shark in distributed mode, set the following:
#export HADOOP_HOME=""
export SPARK_HOME="/Users/samim/Development/NoSQL/spark/spark-0.9.0"
export MASTER="spark://Shamim-2.local:7077"

You don't need to add the Hadoop_home but hadoop data node will be need for Hive.
Now run shark with follwing commands:
$SHARK_HOME/bin/shark
If every thing goes well you will get the shark prompt
Logging initialized using configuration in jar:file:/Users/samim/Development/NoSQL/shark/shark-0.9.0/lib_managed/jars/edu.berkeley.cs.shark/hive-common/hive-common-0.11.0-shark.jar!/hive-log4j.properties
Hive history file=/tmp/samim/hive_job_log_samim_1966@Shamim-2.local_201404051958_1374334454.txt
2014-04-05 19:58:36.917 java[1966:1903] Unable to load realm info from SCDynamicStore
8.912: [GC 279616K->14731K(1013632K), 0.0679980 secs]
9.869: [Full GC 77899K->8724K(1013632K), 0.1434890 secs]
Reloading cached RDDs from previous Shark sessions... (use -skipRddReload flag to skip reloading)
10.806: [Full GC 86744K->13497K(1013632K), 0.1476060 secs]
12.345: [Full GC 85586K->19626K(1013632K), 0.2058780 secs]
13.053: [Full GC 47205K->9867K(1013632K), 0.2303530 secs]
13.287: [Full GC 14620K->9906K(1013632K), 0.1734460 secs]
18.762: [Full GC 85934K->16264K(1013632K), 0.2141430 secs]
25.049: [Full GC 237505K->22715K(1013632K), 0.3878240 secs]

30.287: [Full GC 83618K->19037K(1013632K), 0.3781930 secs]

shark>

Now lets play with Shark
1) We assume that Cassnadra node is up and runing (see part 1)
2) Create data base test in Shark
shark> create database test;
3) Create External table in Shark
shark> CREATE EXTERNAL TABLE test.pokes(foo int, bar string)
    STORED BY 'org.apache.hadoop.hive.cassandra.cql.CqlStorageHandler'
    WITH SERDEPROPERTIES ("cql.primarykey" = "foo", "comment"="check", "read_repair_chance" = "0.2",
    "dclocal_read_repair_chance" = "0.14", "gc_grace_seconds" = "989898", "bloom_filter_fp_chance" = "0.2",
    "compaction" = "{'class' : 'LeveledCompactionStrategy'}", "replicate_on_write" = "false", "caching" = "all");

4) run some command
shark> select count(*) from pokes;
185.022: [Full GC 106345K->24340K(1013632K), 0.2817830 secs]
189.722: [Full GC 228461K->30033K(1013632K), 0.3228080 secs]
OK
427
Time taken: 16.106 seconds
5) Now create cache table for pokes:
shark> CREATE TABLE pokes_cache TBLPROPERTIES ("shark.cache" = "true") AS SELECT * FROM test.pokes;
6) Run query
shark> select count(*) from pokes_cache;
OK
427
Time taken: 2.158 seconds
Only 2 seconds.
From the spark master web UI you can anyalize the stage running for every query
All the credit goes for Spark, Shark teams also to brian Oneil for this encourging blog.

References:
1) Shark on Cassandra (w/ Cash) : Interrogating cached data from C* using HiveQL
2) Spark
3) Building Shark from Source Code

Comments

Popular posts from this blog

Send e-mail with attachment through OSB

Oracle Service Bus (OSB) contains a good collection of adapter to integrate with any legacy application, including ftp, email, MQ, tuxedo. However e-mail still recognize as a stable protocol to integrate with any application asynchronously. Send e-mail with attachment is a common task of any business process. Inbound e-mail adapter which, integrated with OSB support attachment but outbound adapter doesn't. This post is all about sending attachment though JavaCallout action. There are two ways to handle attachment in OSB: 1) Use JavaCallout action to pass the binary data for further manipulation. It means write down a small java library which will get the attachment and send the e-mail. 2) Use integrated outbound e-mail adapter to send attachment, here you have to add a custom variable named attachment and assign the binary data to the body of the attachment variable. First option is very common and easy to implement through javax.mail api, however a much more developer manage t

Tip: SQL client for Apache Ignite cache

A new SQL client configuration described in  The Apache Ignite book . If it got you interested, check out the rest of the book for more helpful information. Apache Ignite provides SQL queries execution on the caches, SQL syntax is an ANSI-99 compliant. Therefore, you can execute SQL queries against any caches from any SQL client which supports JDBC thin client. This section is for those, who feels comfortable with SQL rather than execute a bunch of code to retrieve data from the cache. Apache Ignite out of the box shipped with JDBC driver that allows you to connect to Ignite caches and retrieve distributed data from the cache using standard SQL queries. Rest of the section of this chapter will describe how to connect SQL IDE (Integrated Development Environment) to Ignite cache and executes some SQL queries to play with the data. SQL IDE or SQL editor can simplify the development process and allow you to get productive much quicker. Most database vendors have their own front-en

Load balancing and fail over with scheduler

Every programmer at least develop one Scheduler or Job in their life time of programming. Nowadays writing or developing scheduler to get you job done is very simple, but when you are thinking about high availability or load balancing your scheduler or job it getting some tricky. Even more when you have a few instance of your scheduler but only one can be run at a time also need some tricks to done. A long time ago i used some data base table lock to achieved such a functionality as leader election. Around 2010 when Zookeeper comes into play, i always preferred to use Zookeeper to bring high availability and scalability. For using Zookeeper you have to need Zookeeper cluster with minimum 3 nodes and maintain the cluster. Our new customer denied to use such a open source product in their environment and i was definitely need to find something alternative. Definitely Quartz was the next choose. Quartz makes developing scheduler easy and simple. Quartz clustering feature brings the HA and