Sunday

Book Review: Cassandra Design Patterns

This post is my review of the Packt Publishing book Cassandra Design patterns by Sanjay Sharma. As the main title suggest, it's all about pattern and anti pattern of using Cassandra. The book has almost 74 pages covering 6 chapters.
Preface: What this book covers and Who this book is for
The preface of the book starts with the ideas behind this book. The main idea behind this book is for Cassandra audience to understand where and how to use Cassandra correctly and effectively.The Preface also provides brief summaries of each of the six chapters in the book and convention that follows in this books.
Under the section "What you need for this book", author specified that, readers don't need any special version of Cassandra, however Cassandra 2.0 or above version will be proffered. The "Who this book is for" section of the Preface specify the audience of the book, it may be architect, or developer starting with Cassandra.
Chapter 1: An Overview of Architecture and Data Modeling in Cassandra
In this chapter author briefly describing the architecture and history of the Cassandra. Most of the books of Cassandra and articles always admit this important information. It really nice to know, how Cassandra pick up or combine the best features of two technologies, Google Big table as data model and Amazon Dynamo for scale out. Author also provide core Cassandra architecture, how Cassandra handle write and read under the hood, consistency level and much more. In one moment i cant agree with author about Cassandra read performance (when data is not in cache), Cassandra is not fast for read data. For every single read Cassandra need 2 iops to disk which make Cassandra slow to read data.
In the last part of the chapter author outlines the features of Cassandra which is very useful.
Chapter 2: An Overview of Case and Design Patterns
This chapter introduced a few key use cases and design patterns that briefly discussed in the following chapters. First of all author describes the 3V model and how Cassandra fit on it. Next section coverage Cassandra's high availability architecture and comparison with Oracle RDBMS. In the next few sections, author introduced Cassandra schema flexibility, counter column, streaming analytics capability and much more. Like the first chapter, this second chapter covers additional information about Cassandra strength and features.
Chapter 3: 3V Patterns
3rd chapter covered fundamentals patterns of Cassandra, describe where and how Cassandra should be uses. First part of the chapter provided how Cassandra can handle huge amount of data to scale web application. Also mentioned why RDBMS such Oracle or Terrdata is not vertically scaling well. Finally describe the patterns solution, focused on Cassandra CQL, 3rd party framework which rich Cassandra.
Second pattern focused on Cassandra fast write ability. Cassandra support parallel writes where each node in a cluster is responsible for a specific key range which differs Cassandra from traditional RDBMS. Author also provide benchmark from Netflix which is very impressive and informative to prove the pattern.
Last pattern of this chapter described Cassandra's schema less feature. This Cassandra feature pick from the Google big table and allow Cassandra to store data in multiple formats. This pattern is one of the main advantage over RDBMS, where RDBMS never support schema changes online.
Chapter 4: Core Cassandra Patterns
4th chapter starts with the Cassandra's fundamental feature - high availability. Cassandra provides high availability data store with peer to peer communication between nodes. This feature provides fine-grained control over how the data is spread and replicated across different data centers. Example with Oracle golden gate was very informative and helpful.
Next section refers Cassandra's time series data manipulations. Author successfully explain the term time series with examples and provide solution with CQL pseudo code. Example with CQL code clear the concepts also show how to store and retrieve time series data from Cassandra. Additional information about kariosdb project with a few word fulfil the section.
Last pattern in this chapter provide when and how to use counter column to keep tracing of event or content. Example with pseudo code completely clear the concept and show the reader how it's works under the hood.
Chapter 5: Search and Analytics Applied Use Case Patterns
Chapter five focused on a serious topics about data analysis and search. Every enterprise application need some search capability on data, it could be simple search or complex context search. On the other hand data analysis is another business challenge, which can be very travail. First section of this chapter focused on streaming analytics or real time analytics. Author provides reference architecture where combine Storm framework with Cassandra to do real time analytics. With storm bolt you can easily got precomputed value or aggregate data to alerts on some event.
Second section of this chapter dedicated to enterprise search. Cassandra like any other db doesn't support enterprise search out of box. For enterprise search, now any one can use two most popular search engine Solr and Elasticsearch based on lucene technology. Author explains what inverted index is and when not using Cassandra secondary index.
In the third section, author focused on graph analysis. However Cassandra data model is not fit for graph analysis and there are other databases specially fit for this type of model, for example neo4j is one of the popular graph database fit for these task. But if somebody have to solve graph analysis over Cassandra data, framework Titan can solve this problem.
Final section of this chapter dedicated to Hadoop Cassandra integration. This topics is too much big to write another few tom. Cassandra provides ColumnFamilyInputFormat and ColumnFamilyOutputFormat class that helps to run Map reduce on hadoop. You can use pig (data flow language) tools to run batch analysis over Cassandra data from Hadoop Map reduce, even more you can use Hive like query. Author forget to provide another framework like Spark or presto to fast data analysis
Chapter 6: Patterns and Anti-patterns
Last chapter focused on some additional patterns and anti-patterns, which was very interesting to read. Pattern Content/document store based on question, Which data store to use as a content/document store? Under the hood Cassandra store any data in raw bytes, which allows content or documents to be stored as raw bytes as column values. Author also provide framework Astyanax, which support for storing and retrieving large objects in chunk.
Pattern Materialized view, i have found very useful and detailed explained. Author introduced two implementing of materialized view in Cassandra, Application-tier-driven materialized view and Analytics-driven materialized view.
Last part of the chapter followed with anti-pattern Messaging queue. A lot of time i heard from peoples that, they are using Cassandra as a Messaging Queue, i asked them why? Most of them can't answer the question, a few of them tried to make distributed persistence queue with Cassandra. There are hazelcast and much more product to use as a Distributed queue. I have to agree with author that, messenging queue is an anti-pattern of use Cassandra.
Summery:
I really enjoyed Cassandra Design Patterns and recommended it to anyone interested in learning about Cassandra. Author touched most of all main topics of Cassandra and explained very easily. Now we are depends on Datastax to get any documentation about Cassandra and documentation about Cassandra is not very available. This book could be a major source of information to decided when and how to use Cassandra for real life problem. Thank'x to author Sanjay Sharma for such a nice book and Packt publication to give me a change to review this book.

Saturday

Elasticsearch with Cassandra data

Sooner or later every enterprise application needs full text search with their content. Slor, elasticsearch based on lucene are one the best candidate for developying enterprise search. Elasticsearch got very popularity with its simplicity, but out of box it dosen't support importing data from Cassandra cluster. However Elasticsearch provides river, a river is a pluggable service running within elasticsearch cluster pulling data (or being pushed with data) that is then indexed into the cluster. With a few search i have found a cassandra-river on github from ebay, unfortunatley, project was legeacy and only support Cassandra version 1.2*. With a few effort i rewrite the project with data stax cassandra driver. Here you can find the project, now it support the following features:
1) Cron scheduling;
2) Reading Cassandra rows through Paging;
3) Based on DataStax java driver 2.0;

For quick installation, download the project from the Github. Build with maven:
mvn clean install

it will create river plugin in the folder target/releases/cassandra-river-1.0-SNAPSHOT.zip. To installation the river plugin you could use plugin command line utility.
from the elasticsearch_home/bin directory run the follwing command:
./plugin --url file:/PATH/cassandra-river-1.0-SNAPSHOT.zip --install cassandra-river
now you can start the elasticsearch or and initilize the river with following command:
curl -XPUT 'http://HOST:PORT/_river/cassandra-river/_meta' -d '{
    "type" : "cassandra",
    "cassandra" : {
        "cluster_name" : "Test Cluster",
        "keyspace" : "nortpole",
        "column_family" : "users",
        "batch_size" : 20000,
        "hosts" : "localhost",
        "dcName" : "DC",
        "cron"  : "0/60 * * * * ?"
    },
    "index" : {
        "index" : "prodinfo",
        "type" : "product"
    }
}'
it should start pulling data from your Cassandra cluster.
For remove plugin use:
./plugin --remove cassandra-river

If you have installed elasticsearch _head plugin, you can search as follows:
Improvments plan:
1) Add unit Tests
2) Update index in ES
3) Add newly added rows in ES by date
4) Add multi tables support


Sunday

Interview on PlannetCassandra

Interview on PlanetCassandra.

Saturday

Ad-hoc analysis over Cassandra data with Facebook Presto

A few days ago I attended in Moscow Cassandra meet up with my presentation, from one of the participant, I heard about Facebook project presto for fast data analysis. I was very curious and hurry up to hands on it.
From Presto Site "Presto is a distributed SQL query engine optimized for ad-hoc analysis at interactive speed. It supports standard ANSI SQL, including complex queries, aggregations, joins, and window functions".
Historically Cassandra was lack of interactive Ad-hoc query, even it's doesn't support any aggregate function in CQL. For this reason, whenever we proposed our customers to utilize Cassandra as a database, they were always confused. However, for analysis data over Cassandra we have the following frameworks:
1) Hadoop Map Reduce
2) Spark and Shark
Also a few commercial projects like impala.
But Hadoop Map Reduce is definitely slow to use as Ad-Hoc queries. Spark is very fast with its RDD data models, but it also needs a few exercises to run queries. Spark with Shark even needs Hadoop HDFS to run queries over Cassandra. For these reasons, I am always looking for such SQL engine which can completely independently run over Cassandra Data. Here comes Presto with his simple architecture.
From the Presto overview "The execution model of Presto is fundamentally different from Hive/MapReduce. Hive translates queries into multiple stages of MapReduce tasks that execute one after another. Each task reads inputs from disk and writes intermediate output back to disk. In contrast, the Presto engine does not use MapReduce. It employs a custom query and execution engine with operators designed to support SQL semantics. In addition to improved scheduling, all processing is in memory and pipelined across the network between stages. This avoids unnecessary I/O and associated latency overhead. The pipelined execution model runs multiple stages at once, and streams data from one stage to the next as it becomes available. This significantly reduces end-to-end latency for many types of queries".

Lets quickly setup a cluster and examine what it can do with Cassandra Data. For this blog post i will use 4 nodes Cassandra cluster with 2 Data Center. One Data Center will only For Data and the second data center will uses only for Data analysis. Physically, I am going to use 5 Virtual machines. One virtual machine for Presto coordinator.
First, we have to download 2 files, Presto and presto CLI. Now, lets setup the cluster
1) First, we will setup the coordinator node. Unzip the presto distribution some where in disk. Configure the node.properties, jvm.properties, config.properties and log.properties by documention.
For the coordinator node you should set up coordinator=true in the config.properties.
2) Similarly, set up the workers node. Make sure that config.properties has the following properties
coordinator=false
discovery-server.enabled=false
3)After installation of Presto, you need to deploy Cassandra plugin to every presto node.

On all presto nodes (server & worker nodes), add cassandra.properties to $PRESTO_HOME/etc/catalog (example see below)
connector.name=cassandra
cassandra.contact-points=host1
In my case i have two Cassandra nodes for analysis, thus i have 3 nodes presto cluster.

I have create a keyspace named mnpkeyspace in Cassandra and one cql3 CF on it. CF contains following data model:
CREATE TABLE event_log (
  request_id text,
  start_date timestamp,
  ctn text,
  event_name text,
  process_type text,
  id text,
  ban text,
  end_date timestamp,
  error boolean,
  info text,
  npid text,
  proc_inst_id text,
  system_name text,
  user_name text,
  xml_message text,
  PRIMARY KEY (request_id, start_date, ctn, event_name, process_type, id)
) WITH
  bloom_filter_fp_chance=0.010000 AND
  caching='KEYS_ONLY' AND
  comment='' AND
  dclocal_read_repair_chance=0.000000 AND
  gc_grace_seconds=864000 AND
  index_interval=128 AND
  read_repair_chance=0.100000 AND
  replicate_on_write='true' AND
  populate_io_cache_on_flush='false' AND
  default_time_to_live=0 AND
  speculative_retry='99.0PERCENTILE' AND
  memtable_flush_period_in_ms=0 AND
  compaction={'class': 'SizeTieredCompactionStrategy'} AND
  compression={'sstable_compression': 'LZ4Compressor'};
It has following data sets:
request_id |  start_date   |    ctn    | event_name  | process_type  |                  id                  | ban | end_date | error |
------------+---------------+-----------+-------------+---------------+--------------------------------------+-----+----------+-------+-
 1610250    | 1400852233559 | 123456789 | Provisining | PORTIN1610250 | 1e18abe7-e0e3-4dcf-9af2-351e7906cf19 | BAN | NULL     | false |
 1227843    | 1400851565307 | 123456789 | Provisining | PORTIN1227843 | a063d8cf-1865-4ef8-aedf-2dca7ba8278a | BAN | NULL     | false |
4) Now, it's time to run some sql quires. We have to use command line interface to run query.
java -jar presto-cli-0.68-executable.jar --server coordinator_host:port --catalog cassandra --schema mnpkeyspace
if every thing goes well, you should got command prompt with presto:mnpkeyspace
presto:mnpkeyspace> select count(*) from event_log;
  _col0
---------
 3000019
(1 row)

Query 20140524_163558_00010_5dhjs, FINISHED, 3 nodes
Splits: 1,001 total, 1,001 done (100.00%)
2:28 [3M rows, 2.86MB] [20.3K rows/s, 19.8KB/s]

Lets check the summary:
3 millions of rows in 2:28 minutes, impressive. Lets try group by
presto:mnpkeyspace> select count(*) from event_log group by event_name;
  _col0
---------
       3
 2999999
       4
       3
       1
       1
       1
       4
       3
(9 rows)

Query 20140524_163948_00011_5dhjs, FINISHED, 3 nodes
Splits: 1,004 total, 1,004 done (100.00%)
2:26 [3M rows, 2.86MB] [20.6K rows/s, 20.1KB/s]
Lets's make another try, this time aggregate function max:
presto:mnpkeyspace> select max(start_date) from event_log group by event_name;
     _col0
---------------
 1400851465281
 1400851434090
 1400851432367
 1400855180836
 1400851464236
 1400851465038
 1400851210754
 1400851210410
 1400851210510
(9 rows)

Query 20140524_164959_00012_5dhjs, FINISHED, 3 nodes
Splits: 1,004 total, 1,004 done (100.00%)
2:20 [3M rows, 2.86MB] [21.4K rows/s, 20.9KB/s]
average read 21.4k rows per second. it's really impressive, note that i have only 2 Cassandra analytical nodes with 4 cpu and 16 GB RAM. It's enough for today, next time i have plan to examine more analytical functions with windows and will try to benchmark with Spark. All the credit goes for the Face book team, Happy weekend!!
References:
1) https://www.facebook.com/notes/facebook-engineering/presto-interacting-with-petabytes-of-data-at-facebook/10151786197628920
2) http://prestodb.io/docs/current/index.html

Tuesday

3rd Moscow Cassandra meetup

3rd Moscow Cassandra meetup presentation. Using Spark and Shark to analysis big data over Cassandra data.

Saturday

Real time data processing with Cassandra, Part 2

The last few months I was busy with our new telecommunication project to develop MNP (Mobile number portability) for the Russian Federation. Now in Russia anybody can change their telephone operator without changing the number, it's a another history for another blog. Today I have found a few hours to keep my promise. In this blog, I will try to describe how to configure and manage spark pseudo cluster with shark for real time data processing. In the previous blog I will show how to use hive with Hadoop to process data from Cassandra. For whom, who doesn't familiar, Spark is execution engines that supports cyclic data flow and in-memory computing, in the otherhand Shark is an open source distributed SQL query engine for Hadoop data. It brings state-of-the-art performance and advanced analytics to Hive users. I am going to use following open source projects to configure and run the spark + shark cluster :
1) Scala-2.10.3
2) Spark-0.9.0-incubating-bin-hadoop1
3) Shark-0.9.0
4) Hive-0.11.0-bin-shark
6) Jdk 1.7
7) Hadoop-1.2.1
8) Cassandra-1.2.7
9) Cash (Hive Cassandra handler)

I have used my old Hadoop cluster from the previous post, for configure Hadoop cluster, please check my post link above. First, we have to download and install Scala-2.10.3 locally.

Setup Spark:

1) Unzip the downloaded bundle and add the SCALA_HOME to your environment variables. We will use Scala to build Spark and Shark.
2) Download Spark and unzip in any preferable folder.
3) From the home directory of Spark, run sbt/sbt assemble, you can go for coffee break. It will take more than 10 minutes to compile and build. If something goes wrong with build please
4) copy and rename the $SPARK_HOME/conf/spark-env.sh.templete file to $SPARK_HOME/conf/spark-env.sh by command cp spark-env.sh.templete spark-env.sh
5) add JAVA_HOME variable to spark-env.sh export JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/1.7/Home
6) chmod +x spark-env.sh
7) Run spark master by $SPARK_HOME/sbin/start-master.sh
In the out put console you should found the following lines:
starting org.apache.spark.deploy.master.Master, logging to /Users/samim/Development/NoSQL/spark/spark-0.9.0-incubating-bin-hadoop1/sbin/../logs/spark-samim-org.apache.spark.deploy.master.Master-1-Shamim-2.local.out
With cat command you will get the following informations:
14/04/05 16:01:08 INFO Remoting: Starting remoting
14/04/05 16:01:09 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkMaster@Shamim-2.local:7077]
14/04/05 16:01:09 INFO Master: Starting Spark master at spark://Shamim-2.local:7077
14/04/05 16:01:09 INFO MasterWebUI: Started Master web UI at http://192.168.1.46:8080
14/04/05 16:01:09 INFO Master: I have been elected leader! New state: ALIVE
14/04/05 16:01:17 INFO Master: Registering worker Shamim-2.local:1617 with 2 cores, 3.0 GB RAM
In my case http://192.168.1.46:8080 will be web UI and the spark://Shamim-2.local:7077 will be the Spark master uri.
8) Now start the worker slave
$SPARK_HOME/sbin/start-slaves.sh
In the console you should find the log file location as follows:
localhost: starting org.apache.spark.deploy.worker.Worker, logging to /Users/samim/Development/NoSQL/spark/spark-0.9.0-incubating-bin-hadoop1/sbin/../logs/spark-samim-org.apache.spark.deploy.worker.Worker-1-Shamim-2.local.out
Point you browser to http://192.168.1.46:8080 and you should find the following page.


Now Spark is ready for use.
Setup Shark:
1) Download Hive from here.
2) Unzip hive
3) Download Cash (Hive Cassandra handler)
4) Unzip and build with maven. mvn clean package.
5) Copy target/*jar and target/dependency/cassandra-*.jar to $HIVE_HOME/lib
6) Download Shark from github
7) Unzip the archive and run $SHARK_HOME/sbt/sbt package
8) Copy and rename $SHARK_HOME/conf/shark-env.sh.template to $SHARK_HOME/conf/shark-env
9) chmod +x $SHARK_HOME/conf/shark-env.sh
10) Add follwing informations to the shark-env.sh
export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.7.0_09.jdk/Contents/Home
export SPARK_MEM=1g
# (Required) Set the master program's memory
export SHARK_MASTER_MEM=1g
# (Required) Point to your Scala installation.
export SCALA_HOME="/Users/samim/Development/scala/scala-2.10.3"
# (Required) Point to the patched Hive binary distribution
export HIVE_HOME="/Users/samim/Development/NoSQL/hive/hive-0.11.0-bin-shark-0.9.0"
# (Optional) Specify the location of Hive's configuration directory. By default,
# it points to $HIVE_HOME/conf
#export HIVE_CONF_DIR="$HIVE_HOME/conf"
# For running Shark in distributed mode, set the following:
#export HADOOP_HOME=""
export SPARK_HOME="/Users/samim/Development/NoSQL/spark/spark-0.9.0"
export MASTER="spark://Shamim-2.local:7077"

You don't need to add the Hadoop_home but hadoop data node will be need for Hive.
Now run shark with follwing commands:
$SHARK_HOME/bin/shark
If every thing goes well you will get the shark prompt
Logging initialized using configuration in jar:file:/Users/samim/Development/NoSQL/shark/shark-0.9.0/lib_managed/jars/edu.berkeley.cs.shark/hive-common/hive-common-0.11.0-shark.jar!/hive-log4j.properties
Hive history file=/tmp/samim/hive_job_log_samim_1966@Shamim-2.local_201404051958_1374334454.txt
2014-04-05 19:58:36.917 java[1966:1903] Unable to load realm info from SCDynamicStore
8.912: [GC 279616K->14731K(1013632K), 0.0679980 secs]
9.869: [Full GC 77899K->8724K(1013632K), 0.1434890 secs]
Reloading cached RDDs from previous Shark sessions... (use -skipRddReload flag to skip reloading)
10.806: [Full GC 86744K->13497K(1013632K), 0.1476060 secs]
12.345: [Full GC 85586K->19626K(1013632K), 0.2058780 secs]
13.053: [Full GC 47205K->9867K(1013632K), 0.2303530 secs]
13.287: [Full GC 14620K->9906K(1013632K), 0.1734460 secs]
18.762: [Full GC 85934K->16264K(1013632K), 0.2141430 secs]
25.049: [Full GC 237505K->22715K(1013632K), 0.3878240 secs]

30.287: [Full GC 83618K->19037K(1013632K), 0.3781930 secs]

shark>

Now lets play with Shark
1) We assume that Cassnadra node is up and runing (see part 1)
2) Create data base test in Shark
shark> create database test;
3) Create External table in Shark
shark> CREATE EXTERNAL TABLE test.pokes(foo int, bar string)
    STORED BY 'org.apache.hadoop.hive.cassandra.cql.CqlStorageHandler'
    WITH SERDEPROPERTIES ("cql.primarykey" = "foo", "comment"="check", "read_repair_chance" = "0.2",
    "dclocal_read_repair_chance" = "0.14", "gc_grace_seconds" = "989898", "bloom_filter_fp_chance" = "0.2",
    "compaction" = "{'class' : 'LeveledCompactionStrategy'}", "replicate_on_write" = "false", "caching" = "all");

4) run some command
shark> select count(*) from pokes;
185.022: [Full GC 106345K->24340K(1013632K), 0.2817830 secs]
189.722: [Full GC 228461K->30033K(1013632K), 0.3228080 secs]
OK
427
Time taken: 16.106 seconds
5) Now create cache table for pokes:
shark> CREATE TABLE pokes_cache TBLPROPERTIES ("shark.cache" = "true") AS SELECT * FROM test.pokes;
6) Run query
shark> select count(*) from pokes_cache;
OK
427
Time taken: 2.158 seconds
Only 2 seconds.
From the spark master web UI you can anyalize the stage running for every query
All the credit goes for Spark, Shark teams also to brian Oneil for this encourging blog.

References:
1) Shark on Cassandra (w/ Cash) : Interrogating cached data from C* using HiveQL
2) Spark
3) Building Shark from Source Code