Centralize logs with logstash

Now a days logging is the essential part of the any application. Logging useful piece of information can easily help to find errors, fixing bug and much more. Modern application now scaling up in hundreds servers in cloud. Managing and monitoring logs of heterogeneous system is very challenging for any system administrator even more challenging for developers to fixing bugs. Last Friday evening we stared our testing with 3rd party products and stuck with a few bugs. As usual we first looked for the log and tried to find some hints to reproduce the bug. Here we got serious problems, our application scaled on few application servers such as Oracle GlassFish, Apache Tomcat. It was not a pleasant moments to search all over the server to find a few pieces of information. Here we have understand that we have to use any tools to manage, monitor and search logs. With my experience we have a few options:
1) Flume, Hadoop hdfs and ElasticSearch.
2) Kafka, Storm and Slor.
3) Logstash and Graylog2.
First option with Hadoop we have implemented in few cases but for the current project it seems very big gun. Second option also need some configuration and coding experience to build up the log managements tools from the scratch. My aim was to use something new and elegant which we can configured with less effort and easy to use. A few times i heard about logstash and decided to make a try. In the rest of the post i will describe how to install and configure logstash for centralizing log, i.e. collecting, aggregating and searching log. Most of the features of logstash is as follows:
1) Collecting log through agents
2) Aggregating logs
3) Shipping the logs in ElasticSearch
4) Web interface for searching logs
5) Open source
6) Everything in an one jar, nothing more.
7) Very well documented with examples

Take a look at the high level architecture of logstash:
For centralizing you have to need the followings components:
1) ElasticSearch
2) Redis
I have go through the getting started page and everything runs fines as a charm. Only one error i have got when tried to install Redis.
$ make
clang: error: no such file or directory: '../deps/hiredis/libhiredis.a'
clang: error: no such file or directory: '../deps/lua/src/liblua.a'
make[1]: *** [redis-server] Error 1
make: *** [all] Error 2
By googling in internet i have found the solution very easily as follows:
$ make
cd deps
make lua hiredis linenoise
and finalized the installation
$ make
cd $REDIS_CODE/src
In my cases i wanted to collect log from the Glassfish server.log and use the basic configuration for agent
input {
    file {
    type => "server"

    # Wildcards work, here :)
    path => [ "$DOMAIN_HOME/logs/*.log" ]

output {
  #stdout { codec => rubydebug }
  redis { host => "" data_type => "list" key => "crm" }
That's all. Happy coding and bloging.

Real time data processing with Cassandra, Part 1

This is the first part of getting start with real time data processing with Cassandra. In the first part i am going to describe how to configure Hadoop, Hive and Cassandra, also some adhoc query to use new CqlStorageHandler. In the second part i will show, how to use Shark and Spark for real time fast data processing with Cassandra. I was encourage by the blog from the Data Stax, you can find out ithere. Also all the credit goes for the author of the library cassandra-handler and Alex Lui for developing the CQLCassandraStorage. Of course you can use DataStax enterprise version for the first part, Data Stax enterprise version has built in support Hive and Hadoop. In this blog post i will use all the native apache products. If you are interested in Real time data process, please check this blog.
In the first part i will use following products:
1) Hadoop 1.2.1 (Single node cluster)
2) Hive 0.9.0
3) Cassandra 1.2.6 (Single node cluster)
4) cassandra-handler 1.2.6 (depends on Hive version 0.9.1, not working on other version of Hive)
Lets first download and configure Hadoop. Please check my old post to configure Hadoop. Configuration step is same as my old post. If you will got the following error
upgrade to version -41 is required.
please run the command start namenode -upgrade and restart your hadoop server.
Now lets install and configure Hive.
1) Download Hive 0.9.0 and unzip somewhere in your local machine.
2) set HIVE_HOME in your bash_profile and path env variables.
3) create data warehouse directory in HDFS
$HADOOP_HOME/bin/hadoop fs -mkdir       /user/hive/warehouse
4) set them chmod g+w
$HADOOP_HOME/bin/hadoop fs -chmod g+w   /user/hive/warehouse
5) run hive by the command $HIVE_HOME/bin/hive or just hive (if you set the $HIVE_HOME/bin in your env path)
6) Create hive database
7) Use the database
hive> use test
8) create local hive table in database test
hive> CREATE TABLE hpokes (foo INT, bar STRING);
9) insert some data into table
hive> LOAD DATA LOCAL INPATH '$HIVE_HOME/examples/files/kv1.txt' OVERWRITE INTO TABLE pokes;
10) Run the following command
hive> select * from hpokes;
Command should be end up with a lot query result
29 val_30
242 val_243
285 val_286
35 val_36
227 val_228
395 val_396
244 val_245
Time taken: 0.334 seconds
If something goes wrong, check you installation, i have used following quick start guide.
11) Run some analytical function query
hive> select count(*) from hpokes;
above command should start Hadoop map reduce job. Progress of the job should be shown in console and should be end up with these following messages
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2013-09-22 20:02:43,178 Stage-1 map = 0%,  reduce = 0%
2013-09-22 20:02:49,214 Stage-1 map = 100%,  reduce = 0%
2013-09-22 20:03:00,403 Stage-1 map = 100%,  reduce = 33%
2013-09-22 20:03:01,418 Stage-1 map = 100%,  reduce = 100%
Ended Job = job_201309221750_0009
MapReduce Jobs Launched: 
Job 0: Map: 1  Reduce: 1   HDFS Read: 11870 HDFS Write: 5 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
Time taken: 40.444 seconds
Now it's time for install and run Cassandra.
12) Download Cassandra version 1.2.6 and install it by quick start guide, or you can check my previous post to get some quick start.
13) Create the following keyspace and CF
CREATE KEYSPACE test WITH replication = {
  'class': 'SimpleStrategy',
  'replication_factor': '1'
Of course you can create keyspace and CF from the Hive, which we will see later.
Now we have clone the project cassandra-handler from the git
git clone cassandra-hive
Or you can download the zip from the git and unzip in any folder
15) In my case i have change the Hadoop core version in pom.xml, because i am using Hadoop-1.2.1 version
16) compile and build the project
mvn clean install
17) Copy the following libraries from /target and /target/dependencies to $HIVE_HOME/lib and $HADOOP_HOME/lib directory
18) Restart hive and Hadoop.

19) Now we have to create Cassandra CF from Hive
hive> use test;
hive> CREATE EXTERNAL TABLE test.pokes(foo int, bar string)
    STORED BY 'org.apache.hadoop.hive.cassandra.cql.CqlStorageHandler'
    WITH SERDEPROPERTIES ("cql.primarykey" = "foo", "comment"="check", "read_repair_chance" = "0.2",
    "dclocal_read_repair_chance" = "0.14", "gc_grace_seconds" = "989898", "bloom_filter_fp_chance" = "0.2",
    "compaction" = "{'class' : 'LeveledCompactionStrategy'}", "replicate_on_write" = "false", "caching" = "all");
20) Lets insert some data from table hive hpokes to cassandra pokes
hive> insert into table pokes select * from hpokes;
it should start Hadoop map reduce job and, insert data from hpokes table to cassandra pokes.
Hadoop job information for Stage-0: number of mappers: 1; number of reducers: 0
2013-09-22 18:01:19,671 Stage-0 map = 0%,  reduce = 0%
2013-09-22 18:01:29,811 Stage-0 map = 100%,  reduce = 0%
2013-09-22 18:01:34,866 Stage-0 map = 100%,  reduce = 100%
Ended Job = job_201309221750_0005
1004 Rows loaded to pokes
MapReduce Jobs Launched: 
Job 0: Map: 1   HDFS Read: 11870 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
Time taken: 40.162 seconds

in my case it inserted 1004 rows in tables.
21) Now you can run any analytical query in table pokes as follows
hive> select count(*) from pokes;
above command also runs Hadoop map reduce and should return the following messages
2013-09-22 18:13:40,390 Stage-1 map = 0%,  reduce = 0%
2013-09-22 18:13:58,229 Stage-1 map = 100%,  reduce = 0%
2013-09-22 18:14:12,642 Stage-1 map = 100%,  reduce = 33%
2013-09-22 18:14:13,649 Stage-1 map = 100%,  reduce = 100%
Ended Job = job_201309221750_0007
MapReduce Jobs Launched: 
Job 0: Map: 2  Reduce: 1   HDFS Read: 830 HDFS Write: 4 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
Time taken: 70.168 seconds
22) Now insert a few rows in Cassandra CF through CQLSH
cqlsh> insert into pokes(foo, bar) values(1000, 'test');
23) run the command from hive to find the row
hive> select * from pokes where foo=1000;
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 0
2013-09-22 18:14:52,545 Stage-1 map = 0%,  reduce = 0%
2013-09-22 18:15:02,674 Stage-1 map = 100%,  reduce = 0%
2013-09-22 18:15:07,756 Stage-1 map = 100%,  reduce = 100%
Ended Job = job_201309221750_0008
MapReduce Jobs Launched: 
Job 0: Map: 2   HDFS Read: 830 HDFS Write: 10 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
1000 test
Time taken: 30.891 seconds
We have reached in the end of the post. Thank you everybody to come across the blog. Happy bloging. In the next part we will install Shark and Spark for real time data processing.

If you like this article, you would also like the book

Big Data Moscow meetup - Presentation from AT Consulting

Big data Moscow meetup presentation, described our use case, problems we solved and how we utilized Cassandra and Hadoop with Big data processing

An impatient start with Cascading

Last couple of years when i worked with Hadoop, in many blogs and conferences i have heard about Cascading framework on top of Hadoop to implements ETL. But in our lean start up project we decided to used Pig and we implemented our data flow based on Pig. Recently i have got a new book from O'Reilly Media "Enterprise Data Workflows with Cascading" and finished two interesting chapter with one breath. This weekend i have managed a couple of hours to make some try with examples from the Book. Author Paco Nathan very nicely explains why and when you should use Cascading instead of PIg or Hive, even more he gives examples to try at home. My today's blog is to my first expression on Cascading.

All the examples of the book could be found from the git hub. I have cloned the project from the Git hub and ready to run the examples. Project Impatient compiles and build with Gradle. I have run gradle clean jar and stacked with the following errors:
Could not resolve all dependencies for configuration ':providedCompile'.
> Could not download artifact 'org.codehaus.jackson:jackson-core-asl:1.8.8@jar'
   > Artifact 'org.codehaus.jackson:jackson-core-asl:1.8.8@jar' not found.

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output.
I am not a Gradle geek and didn't find a quick fix for that and decided to use Maven for build and compile the examples (However, it will be great if some body point me how to fix). Further more author doesn't describe how to install standalone local Hadoop, certainly installing hadoop is out of scope of the Book. However we have to install local stand alone hadoop to run all the examples. Certainly there are a lot of blogs, articles could be found where briefly describe how to install Hadoop. As far as it's a impatient start we will install hadoop very quickly as possible.
1) Download the hadoop distribution
2) Unzip the hadoop distribution in any folder.
3) Add the HADOOP_HOME to the env path.
actually we are ready to go but it's better to check three files in conf folder: core-site.xml, hdfs-site.xml, mapred-site.xml. All the three files should not have any configuration for local use as follows:
4) Add the following pom.xml file in the directory part1
<project xmlns="" xmlns:xsi=""









5) run mvn clean install, it should create a fat jar file casscading-1.0-jar-with-dependencies.jar
6) Now we are ready to run the example part1
rm -rf output
hadoop jar ./target/casscading-1.0-jar-with-dependencies.jar data/rain.txt ./output/rain
I have got the following out put from the console
13/08/11 20:53:11 INFO util.HadoopUtil: resolving application jar from found main method on: impatient.Main
13/08/11 20:53:11 INFO planner.HadoopPlanner: using application jar: /Users/samim/Development/NoSQL/casscading/./target/casscading-1.0-jar-with-dependencies.jar
13/08/11 20:53:11 INFO property.AppProps: using 96618C129BE0C7C591007BCFB650BAE6
2013-08-11 20:53:11.650 java[2161:1903] Unable to load realm info from SCDynamicStore
13/08/11 20:53:12 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
13/08/11 20:53:12 WARN snappy.LoadSnappy: Snappy native library not loaded
13/08/11 20:53:12 INFO mapred.FileInputFormat: Total input paths to process : 1
13/08/11 20:53:12 INFO util.Version: Concurrent, Inc - Cascading 2.1.2
13/08/11 20:53:12 INFO flow.Flow: [] starting
13/08/11 20:53:12 INFO flow.Flow: []  source: Hfs["TextDelimited[['doc_id', 'text']->[ALL]]"]["data/rain.txt"]"]
13/08/11 20:53:12 INFO flow.Flow: []  sink: Hfs["TextDelimited[[UNKNOWN]->['doc_id', 'text']]"]["output/rain"]"]
13/08/11 20:53:12 INFO flow.Flow: []  parallel execution is enabled: false
13/08/11 20:53:12 INFO flow.Flow: []  starting jobs: 1
13/08/11 20:53:12 INFO flow.Flow: []  allocating threads: 1
13/08/11 20:53:12 INFO flow.FlowStep: [] at least one sink does not exist
13/08/11 20:53:12 INFO flow.FlowStep: [] source modification date at: Sun Aug 11 18:50:44 MSK 2013
13/08/11 20:53:12 INFO flow.FlowStep: [] starting step: (1/1) output/rain
13/08/11 20:53:12 INFO mapred.FileInputFormat: Total input paths to process : 1
13/08/11 20:53:13 INFO flow.FlowStep: [] submitted hadoop job: job_local_0001
13/08/11 20:53:13 INFO mapred.Task:  Using ResourceCalculatorPlugin : null
13/08/11 20:53:13 INFO io.MultiInputSplit: current split input path: file:/Users/samim/Development/NoSQL/casscading/data/rain.txt
13/08/11 20:53:13 INFO mapred.MapTask: numReduceTasks: 0
13/08/11 20:53:13 INFO hadoop.FlowMapper: cascading version: Concurrent, Inc - Cascading 2.1.2
13/08/11 20:53:13 INFO hadoop.FlowMapper: child jvm opts: -Xmx200m
13/08/11 20:53:13 INFO hadoop.FlowMapper: sourcing from: Hfs["TextDelimited[['doc_id', 'text']->[ALL]]"]["data/rain.txt"]"]
13/08/11 20:53:13 INFO hadoop.FlowMapper: sinking to: Hfs["TextDelimited[[UNKNOWN]->['doc_id', 'text']]"]["output/rain"]"]
13/08/11 20:53:13 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
13/08/11 20:53:13 INFO mapred.LocalJobRunner: 
13/08/11 20:53:13 INFO mapred.Task: Task attempt_local_0001_m_000000_0 is allowed to commit now
13/08/11 20:53:13 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0001_m_000000_0' to file:/Users/samim/Development/NoSQL/casscading/output/rain
13/08/11 20:53:16 INFO mapred.LocalJobRunner: file:/Users/samim/Development/NoSQL/casscading/data/rain.txt:0+510
13/08/11 20:53:16 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
13/08/11 20:53:18 INFO util.Hadoop18TapUtil: deleting temp path output/rain/_temporary
Shamim-2:casscading samim$ pwd
7) To view the results:
cat ./output/rain/*
here is the out put
doc_id text
doc01 A rain shadow is a dry area on the lee back side of a mountainous area.
doc02 This sinking, dry air produces a rain shadow, or area in the lee of a mountain with less rain and cloudcover.
doc03 A rain shadow is an area of dry land that lies on the leeward (or downwind) side of a mountain.
doc04 This is known as the rain shadow effect and is the primary cause of leeward deserts of mountain ranges, such as California's Death Valley.
doc05 Two Women. Secrets. A Broken Land. [DVD Australia]
Shamim-2:casscading samim$ hadoop jar ./target/casscading-1.0-jar-with-dependencies.jar  data/rain.txt ./output/wc
Seems every thing works fine, i also run the Word count example.
Actually i am interested to manipulate Cassandra data through Cascading, also Lingual. I already found a few git hub projects which implemented Cassandra Tap and Sink for easy integration, thank you guys for nice work.


JVM multi-tenancy with ElastiCat

One of the speaker in last JavaOne Moscow mentioned about JVM multi-tenancy and declare it's availability in Java 9. End of the last year IBM also declare their IBM J9 JVM to host multiple applications. Out of curiosity i was interested on this topics and time to time read a few articles and see presentation. Who hasn't familiar with JVM multi-tenancy yet, JVM multi-tenancy is hosting more then one application on single JVM. Here you will found a very good articles about JVM multi-tanancy. Last Friday i have got an e-mail from Dzone mailing list about ElastiCat, which providing jvm multi tenancy and much more. After spending some time i download the products and deploy simple web service on it and play with JVM. Today i have found a few spare hours after lunch and decided to share my experience with ElastiCat.
ElastiCat, as they declared, provides Multitenancy, Isolation, Density and Elasticity through Tomcat. You can deploy more than one application on single tomcat and share infrastructure resources. ElastiCat provides virtualization layer that isolates applications in Java Virtual Containers, it's means when you are deploying your application in ElastiCat, you are deploying in Java Virtual Containers and you have control on this JVC, you can add heap memory on it or can add any extra resources. Ok lets try some quick test.
1) Download ElastiCat and extract it in any directory, please see the prerequisites first. Currently ElastiCat doesn't support JDK 1.7.
2) Run the servlet container.
3) Develop a quick web service for testing purpose.
4) Deploy the web service war in directory $ELASTICAT_HOME/cloudapps. If you want to deploy in your application host JVM, if can use $ELASTICAT_HOME/webapps
5) ElastiCat also provides a few examples to try, one of the test-app you can reach by http://localhost:8080/test-app
6) Now we can use jirsh shell, jirsh shell - is a command line administrative interface to the Waratek CloudVM for Java. The Waratek jirsh shell is based on the libvirt virsh shell.
7) Connect through terminals ssh -p 2222 super@localhost - use word super for password
8) Run the command list which will show the following output (it may be differ from yours)
JVCID    GROUP    STATUS          NAME                    COMMAND
0        0        Running         dom-0                   platform
1        0        Running         jvc-1                   /examples1.war
2        0        Running         jvc-2                   /examples2.war
3        0        Running         jvc-3                   /test-app.war
4        0        Running         jvc-4                   /test-infras.war
my webservice artifact test-infras.war is deployed in jvc-4, we can check the configuration by running the command dominfo
dominfo 4

JVCID:                          4
JVC-NAME:                       jvc-4
JVC type:                       ServletContainer
JVC command line:               /test-infras.war
Console log file:               /home/xx/ccc/elasticcat/waratek-elasticat-0.9.2/waratek/var/log/javad/elasticat/jvc-4/stdout
JVC status:                     Running
JVC persistence:                true
JVC priority:                   10
JVC elastic group:              0
JVC uptime:                     21 minutes, 3.237 seconds
JVC cpu usage:                  0.0010 GHz-hours (1.570 seconds)
Maximum heap memory:            0 (unlimited)
Allowed elastic memory:         0KiB
Used heap memory:               1.16MiB
Classloader count:              4
Total classes loaded:           2470
Thread maximum limit:           0 (unlimited)
Alive thread count:             1
Alive daemon threads:           1
Peak thread count:              1
Total started threads:          1
Alive thread IDs:               57
Number of host processors:      4
Number of JVC processors:       4
Cpu affinity:                   true, true, true, true,
File descriptor limit:          0 (unlimited)
File descriptor count:          10
File bytes written:             10781 (10.53KiB)
File bytes read:                21281 (20.78KiB)
Socket maximum limit:           0 (unlimited)
Active socket count:            0
Network bytes written:          5696 (5.56KiB)
Network bytes read:             1232 (1.2KiB)
Native library loading is:      Enabled
Virtual root directory:         "/"
9) Lets modify web service to do some memory leaks, perhaps it's very easy way to do OOM errors.
10) Now we are going to fixed the heap memory size for the JVC by running the command setmem
setmem 4 10000 # add about 10 MB heap 
dominfo 4

JVCID:                          4
JVC-NAME:                       jvc-4
JVC type:                       ServletContainer
JVC command line:               /test-infras.war
JVC status:                     Running
JVC persistence:                true
JVC priority:                   10
JVC elastic group:              0
JVC uptime:                     30 minutes, 3.445 seconds
JVC cpu usage:                  0.0009 GHz-hours (1.410 seconds)
Maximum heap memory:            9.77MiB
Allowed elastic memory:         0KiB
Used heap memory:               1.18MiB
Classloader count:              4
Total classes loaded:           2449
11) now we are ready to invoke the web service method to fill up the heap size for getting OOM error, run the web service client a few times
setmem 4 10000 # add about 10 MB heap 
dominfo 4

JVCID:                          4
JVC-NAME:                       jvc-4
JVC type:                       ServletContainer
JVC command line:               /test-infras.war
JVC status:                     Running
JVC persistence:                true
JVC priority:                   10
JVC elastic group:              0
JVC uptime:                     30 minutes, 3.445 seconds
JVC cpu usage:                  0.0009 GHz-hours (1.410 seconds)
Maximum heap memory:            9.77MiB
Allowed elastic memory:         0KiB
Used heap memory:               9.43MiB
Classloader count:              4
Total classes loaded:           2449
12) Check the ElastiCat log file
WARNING: Interceptor for TestService#sayHello has thrown exception, unwinding now
 at org.apache.cxf.interceptor.ServiceInvokerInterceptor.handleMessage(
 at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(
 at org.apache.cxf.phase.PhaseInterceptorChain.resume(
 at org.apache.cxf.interceptor.OneWayProcessorInterceptor$
 at org.apache.cxf.workqueue.AutomaticWorkQueueImpl$
 at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(
 at java.util.concurrent.ThreadPoolExecutor$
 at java.lang.Thread.begin(
 at java.lang.Thread.invokeRun(
 at java.lang.reflect.VMReflection.invokeMethod(
 at java.lang.reflect.Method$NativeAccessor.invoke(
 at java.lang.reflect.MethodNativeAccessorImpl.invoke(
 at java.lang.reflect.Method.invoke(
Caused by: java.lang.OutOfMemoryError
Yes, we already stack with the OOM error on server. At these moment we can check several application deployed in ElastiCat, in my cases others application run well, however i have check only test-app.
Here we can add a more resource (Heap memory) to avoid the OOM error and continue to execute the web service method.
13) Add more 10 MB heap to the 4th JVC
setmem 4 10000
dominfo 4

JVCID:                          4
JVC-NAME:                       jvc-4
JVC type:                       ServletContainer
JVC command line:               /test-infras.war
JVC status:                     Running
JVC persistence:                true
JVC priority:                   10
JVC elastic group:              0
JVC uptime:                     30 minutes, 3.445 seconds
JVC cpu usage:                  0.0009 GHz-hours (1.410 seconds)
Maximum heap memory:            18.65MiB
Allowed elastic memory:         0KiB
Used heap memory:               9.43MiB
Classloader count:              4
Total classes loaded:           2449
Run the web service client and we check the log file, you should find that OOM error clear and the web service continue executing normally.
With Jirsh command line client you also stop, start and resume the JVC, the full list of the command you can find here.
In my experience it's works like a charm, however very hard to say what will be the actual performance in production environment. All the JVC's share the host JVM GC and other resources also, it's hard to say what will be the actual performance in production with high load app. At first glance, bottle neck would be the GC, but for the first step of JVM multi tenancy ElastiCat is good choice.


Hadoop Map reduce with Cassandra Cql through Pig

One of the main disadvantage of using PIG is that, Pig always raise all the data from Cassandra Storage, and after that it can filter by your choose. It's very easy to imagine how the workload will be if you have a tons of million rows in your CF. For example, in our production environment we have always more than 300 million rows, where only 20-25 millions of rows is unprocessed. When we are executing pig script, we have got more than 5000 map tasks with all the 300 millions of rows. It's time consuming and high load batch processing we always tried to avoid but in vain. It's could be very nice if we could use CQL query in pig scripts with where clause to select and filter our data. Here benefit is clear, less data will consume, less map task and a little workload.

Still in latest version of Cassandra (1.2.6) this feature is not available. This feature is planned in next version Cassandra 1.2.7. However patch is already available for this feature, with a few efforts we can make a try.
First we have to download the source code of the Cassandra from the branch 1.2. Also we should have a configured Hadoop cluster with Pig.
1) Download the Cassandra source code from branch 1.2
git clone -b cassandra-1.2
assume that we already familiar with git.
and also apply the patch fix_where_clause.patch

Now compile the source code and setup the cluster. For testing purpose i am using my single node Hadoop 1.1.2 + Cassandra 1.2.7 + Pig 0.11.1 cluster.
2) To setup single node cluster please see here A single node Hadoop + Cassandra + Pig setup
3) Create a CF as follows:
  id text PRIMARY KEY,
  title text,
  age int
and insert some dummy data
insert into test (id, title, age) values('1', 'child', 21);
insert into test (id, title, age) values('2', 'support', 21);
insert into test (id, title, age) values('3', 'manager', 31);
insert into test (id, title, age) values('4', 'QA', 41); 
insert into test (id, title, age) values('5', 'QA', 30); 
insert into test (id, title, age) values('6', 'QA', 30); 
4) Execute the following pig script
rows = LOAD 'cql://keyspace1/test?page_size=1&columns=title,age&split_size=4&where_clause=age%3D30' USING CqlStorage();
dump rows;
you should get following result on pig console
Lets check the Hadoop job history page

Map input records equals 2.
With this new feature we can use where clause to select our desired data from Cassandra storage. You can also check the jira issue tracker to drill down much more.
All the credits goes for the Alex Lui, who implemented this feature.


Leader election of Camel router through Zookeeper

Most modern distributed systems somewhere comes across the problem where only one instance of the process or job should runs and others instance of the same job or process should be standby. Apache zookeeper project provides this above functionality out of box. I am not going to describe all the usecases or solutions that could be solved by the Zookeeper. You can read the following great article for quick start.

Let me describe our use case with more details, we have apache camel component which polling some resources and only one instance of the component should run in any time. It means there should be one leader and a few followers. When master or leader goes down, any follower should replace the master. You also can use doozerd instead of zookeeper for leader election, but for me zookeeper is easy to maintain.

It's very easy to install a multi server zookeeper cluster. I have recommend you to follow the step from this blog to setup a cluster. Now when we have done our cluster setup we are ready to go for camel.
For leader election in camel we can apply ZooKeeperRoutePolicy in camel route component as follows:
ZooKeeperRoutePolicy zooKeeperRoutePolicy = new ZooKeeperRoutePolicy("zookeeper://", 1);
for details please see the link.
Once the policy is defined, it could be plugged into route as follows:
Above route will be execute every 50 seconds and will send message to queue xyz if it could be bind the relevant znode. Whats going on under the hood. Assume we have the znode named ELECTION in our cluster. When we have execute our first instance of this camel route, it will creates a Ephemeral znode as follows:
[zk: 0] ls /ELECTION
when camel route will disconnect the znodes will be deleted automatically. If we we have started another instance of this camel route we will go following znodes:
[zk: 0] ls /ELECTION
[602-bsha-61918db4-6ebe-4283-b381-c3c4688920f10000000006, 602-bsha-154fcb3e-2768-4d24-9803-92b9d2d9c77c0000000005]
By the zookeeper leader election algorithm the ephemeral znodes 602-bsha-154fcb3e-2768-4d24-9803-92b9d2d9c77c0000000005 will be the leader because it's has the smallest sequence number.
If we will stop the first camel route, 602-bsha-61918db4-6ebe-4283-b381-c3c4688920f10000000006 znode will be the leader and second camel route will start polling.
Now imagine what happens, if the first zookeeper server failed? our first camel router would be also failed because its only connected with one zookeeper server. For high availability if we want to connect to more zookeeper server from camel ZookeeperRoutePolicy we have to add more server ip address and port to the configuration. But camel ZookeeperRoutePolicy doesn't provide this functionality out of box. One of my college found a very quick fix of this problem by using ZooKeeperConfiguration class.
We have to first add the bean in spring context, it could be autowired.
<bean id="zookeeperComponent" class="org.apache.camel.component.zookeeper.ZooKeeperComponent"/>
After that we can add ZooKeeperConfiguration properties to the above component as follows:
ZooKeeperConfiguration zooConfig = new ZooKeeperConfiguration();
It's provide camel route component to bind with another zookeeper server for getting high availability. If one of the zookeeper server will fail, camel route component will still running.

Lesson learned : Hadoop + Cassandra integration

After a few weeks break at last we completed our tuning and configuration cassandra hadoop stack in production. It was exciting and i decided to share our experience with all.
1) Cassandra version >> 1.2 has some problems and doesn't integrate with Hadoop very well. The problem with Map Reduce, when we runs any Map reduce job, it always assigns only one mapper regardless of the amount of data. See here for more detail.
2) If you are going to use Pig for you data analysis, think twice, because Pig always picks up all the data from the Cassandra Storage and only after these it can filter. If you have a billions of rows and only a few millions of then you have to aggregate, then Pig always pick up the billions of rows.Here you can find a compression between Hadoop framework for executing Map reduce.
3) If you are using Pig, filter rows as early as possible. Filter fields like null or empty.
4) When using Pig, try to model your CF slightly different. Use Bucket pattern, store your data by weeks or months, it's better than store all the data in one CF. Consider to use TTL.
5) If you have more than 8GB of heap, consider JVM from IBM JVM9 or Azul JVM
6) Always use separate hard disk (High speed, i.e more than 7200 rpm) for Cassandra commit log.
7) Sizing your hardware carefully, choose between RAID 5 and RAID 10 depends on your need. it's better to create separate LUN for every node.
8) Tune bloom filter, if you have analytical node. See here for more information.
9) Tune you Cassandra CF, use ROW cache. Cassandra row cache is off heap cache as like memcache. Its slightly slower than heap cache but much faster than disk IO.

Every experience describe above is my own and it could be differ from any others experiences.
UP1 - Bug already fixed in version 1.2.6