Patch pig_cassandra for setting ttl to cassandra data

Apache pig provides a platform for analyzing very large data set. With apache pig you can easily analyze your data from Cassandra. Apache pig compiles instruction to sequences of Map-Reduce programs which will run on Hadoop cluster. Cassandra source provides a simple pig script to run pig with Cassandra data. Cassandra also provides CassandraStorage class which will load and store data from Cassandra DB, this class will no built in support for storing data with TTL (time to live). In many cases you have to update a few columns or rows with ttl to delete later automatically from DB. For that, i have patched the CassandraStorage class and add the similar functionality. Here is the patch
Index: src/main/java/ru/atc/smev/cassandra/storage/
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
--- src/main/java/ru/atc/smev/cassandra/storage/ (revision 3711)
+++ src/main/java/ru/atc/smev/cassandra/storage/ (revision )
@@ -90,7 +90,7 @@
     private final static String PARTITION_FILTER_SIGNATURE = "cassandra.partition.filter";
     private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
-    private static final Log logger = LogFactory.getLog(CassandraStorage.class);
+    private static final Log logger = LogFactory.getLog(MyCassandraStorage.class);
     private ByteBuffer slice_start = BOUND;
     private ByteBuffer slice_end = BOUND;
@@ -113,6 +113,7 @@
     private Map<bytebuffer olumn="olumn"> lastRow;
     private boolean hasNext = true;
+    private int ttl;
     public CassandraStorage()
@@ -131,8 +132,13 @@
     public int getLimit()
         return limit;
+    public int getTtl() {
+        return ttl;
+    }
     public Tuple getNextWide() throws IOException
         CfDef cfDef = getCfDef(loadSignature);
@@ -337,14 +343,14 @@
     private CfDef getCfDef(String signature)
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
+        Properties property = context.getUDFProperties(MyCassandraStorage.class);
         return cfdefFromString(property.getProperty(signature));
     private List<indexexpression> getIndexExpressions()
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
+        Properties property = context.getUDFProperties(MyCassandraStorage.class);
         if (property.getProperty(PARTITION_FILTER_SIGNATURE) != null)
             return indexExpressionsFromString(property.getProperty(PARTITION_FILTER_SIGNATURE));
@@ -462,6 +468,8 @@
                     slice_reverse = Boolean.parseBoolean(urlQuery.get("reversed"));
                 if (urlQuery.containsKey("limit"))
                     limit = Integer.parseInt(urlQuery.get("limit"));
+                if(urlQuery.containsKey("ttl"))
+                    ttl = Integer.parseInt(urlQuery.get("ttl"));
             String[] parts = urlParts[0].split("/+");
             keyspace = parts[1];
@@ -469,7 +477,7 @@
         catch (Exception e)
-            throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1]]': " + e.getMessage());
+            throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1][&ttl=86400]]': " + e.getMessage());
@@ -694,7 +702,7 @@
     public void setPartitionFilter(Expression partitionFilter)
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
+        Properties property = context.getUDFProperties(MyCassandraStorage.class);
         property.setProperty(PARTITION_FILTER_SIGNATURE, indexExpressionsToString(filterToIndexExpressions(partitionFilter)));
@@ -901,6 +909,11 @@
+            if(getTtl() != 0){
+                column.setTtl(getTtl());
+                column.setTtlIsSet(true);
+            }
             mutation.column_or_supercolumn = new ColumnOrSuperColumn();
             mutation.column_or_supercolumn.column = column;
@@ -924,6 +937,11 @@
+                    if(getTtl() != 0){
+                        column.setTtl(getTtl());
+                        column.setTtlIsSet(true);
+                    }
                 if (columns.isEmpty())
@@ -980,7 +998,7 @@
     private void initSchema(String signature)
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
+        Properties property = context.getUDFProperties(MyCassandraStorage.class);
         // Only get the schema if we haven't already gotten it
         if (!property.containsKey(signature))
You can build the Cassandra with the above patch and using it on your pig script as follows:
STORE updated INTO 'cassandra://audit/auditlog?ttl=86400' USING CassandraStorage();
Or you can use the new class named MyCassandraStorage as a pig UDF function. First you have to compile and archive the class in jar. Later you can define the class in your pig script and use as follows:
DEFINE ParseSmevMessage;
raw = LOAD 'cassandra://audit/auditlog' USING MyCassandraStorage();
filtered = FILTER raw BY  processed.$1=='N';
updated = FOREACH filtered GENERATE  key,TOTUPLE('processed','Y');
STORE updated INTO 'cassandra://audit/auditlog?ttl=86400' USING MyCassandraStorage();


Another Cassandra data manipulation api - PlayOrm

Recently i have found one interesting project on Github named PlayOrm, which features very impress me. I have decided to just play with it. Lets first check out there features list:

  1. Just added support for Entity has a Cursor instead of List which is lazy read to prevent out of memory on VERY wide rows
  2. PlayOrm Queries use way less resources from cassandra cluster than CQL queries
  3. Scalabla JQL(SJQL) supported which is modified JQL that scales(SQL doesn't scale well)
  4. Partitioning so you can query a one trillion row table in just ms with SJQL(Scalable Java Query Language)
  5. Typical query support of <=, <, >, >= and = and no limitations here
  6. Typical query support of AND and OR as well as parenthesis
  7. Inner Join support (Must keep your very very large tables partitioned so you get very fast access times here)
  8. Left Outer Join support
  9. Return Database cursor on query
  10. OneToMany, ManyToMany, OneToOne, and ManyToOne but the ToMany's are nosql fashion not like RDBMS
  11. support of a findAll(Class c, List keys) as is typical in nosql to parallel the reads
  12. Inheritance class heirarchy in one table is supported like hibernate
  13. flush() support - We protect you from failures!!!
  14. first level read cache
  15. Automatically creates ColumnFamilies at runtime
  16. Includes it's own in-memory database for TDD in your unit tests!!!!!
  17. Saves you MORE data storage compared to other solutionst
  18. logging interface below the first level cache so you can see the raw
    operations on cassandra and optimize just like when you use hibernate's
  19. A raw interface using only BigDecimal, BigInteger, and String types
    which is currently used to upload user defined datasets through a web
    interface(and we wire that into generating meta data so they can ad-hoc
    query on the nosql system)
  20. An ad-hoc query interface that can query on any table that was from
    an Entity object. To us on other tables, you can also code up and save
    DboTableMeta objects and the ad-hoc query interface gets you query
    support into those tables
  21. IF you have some noSQL data and some Relational data, store your
    relational data in noSQL now and just maintain one database in
  22. support for joda-time LocalDateTime, LocalDate, LocalTime which
    works way better than java's Date object and is less buggy than java's
    Date and Calendar objects
  23. Command Line tool.  
Impressive yah )) Feature 4 can Partitioning can replace Cassandra composite primary key feature. What i gave done - just clone the project from the git hub. Import the project in IntelliJ idea and start coding.

First i made a try to feature Inner Join.

1) Start my local Cassandra data base.
2) Create an Keyspace named MyKeyspace through CQL as follows:

CREATE KEYSPACE MyKeyspace WITH strategy_class='SimpleStrategy'
 AND strategy_options:replication_factor=1;
3) Create two simple java Pojo with PlayOrm annotations:
Entity log  - one to one relation with Entity event
@NoSqlQuery(name="findlog", query="select *  FROM Log as l INNER JOIN l.event as ee where l.user=:user")
public class Log {
    private int id;
    //private String
    private String msg;
    private String user;
    private Date   time;
    private Event event;
    public int getId() {
        return id;
    public void setId(int id) { = id;
    public String getMsg() {
        return msg;
    public void setMsg(String msg) {
        this.msg = msg;
    public String getUser() {
        return user;
    public void setUser(String user) {
        this.user = user;
    public Date getTime() {
        return time;
    public void setTime(Date time) {
        this.time = time;
    public Event getEvent() {
        return event;
    public void setEvent(Event event) {
        this.event = event;
Entity Event 

import com.alvazan.orm.api.base.anno.NoSqlEntity;
import com.alvazan.orm.api.base.anno.NoSqlId;
import com.alvazan.orm.api.base.anno.NoSqlIndexed;
public class Event {
    private int id;
    private String code;
    private String name;
    //private Log log;
    public int getId() {
        return id;
    public void setId(int id) { = id;
    public String getCode() {
        return code;
    public void setCode(String code) {
        this.code = code;
    public String getName() {
        return name;
    public void setName(String name) { = name;
for quick start better to use PlayOrm FactorySingleton which you can found it the test package
package com.alvazan.test;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alvazan.orm.api.base.Bootstrap;
import com.alvazan.orm.api.base.DbTypeEnum;
import com.alvazan.orm.api.base.NoSqlEntityManagerFactory;
public class FactorySingleton {
private static final Logger log = LoggerFactory.getLogger(FactorySingleton.class);
private static NoSqlEntityManagerFactory factory;

public static Config getConfigForAllTests() {
String clusterName = "Test Cluster";
//DbTypeEnum serverType = DbTypeEnum.IN_MEMORY;
    DbTypeEnum serverType = DbTypeEnum.CASSANDRA;
String seeds = "localhost:9160";

return new Config(serverType, clusterName, seeds);

public synchronized static NoSqlEntityManagerFactory createFactoryOnce() {
if(factory == null) {
Config config = getConfigForAllTests();
//We used this below commented out seeds to test our suite on a cluster of 6 nodes to see if any issues pop up with more
//nodes using the default astyanax consistency levels which I believe for writes and reads are both QOURUM
//which is perfect for us as we know we will get the latest results
//String seeds = ",,";
Map<string object="object"< props = new HashMap<string object="object">();
factory = createFactory(config, props);
return factory;

public static NoSqlEntityManagerFactory createFactory(Config config, Map<string object="object"> props) {"CREATING FACTORY FOR TESTS");
props.put(Bootstrap.AUTO_CREATE_KEY, "create");
switch (config.getServerType()) {
//nothing to do
Bootstrap.createAndAddBestCassandraConfiguration(props, config.getClusterName(), "MyKeyspace", config.getSeeds());
throw new UnsupportedOperationException("not supported yet, server type="+config.getServerType());

NoSqlEntityManagerFactory factory = Bootstrap.create(config.getServerType(), props, null, null);
return factory;
Now it's time to put some data on Cassandra and write Managed query

package com.alvazan.test;

import com.alvazan.orm.api.base.NoSqlEntityManager;
import com.alvazan.orm.api.base.NoSqlEntityManagerFactory;
import com.alvazan.orm.api.base.Query;
import com.alvazan.test.db.Email;
import com.alvazan.test.db.User;
import com.alvazan.test.mytest.Event;
import com.alvazan.test.mytest.Log;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
public class BasicTest {
    public static void main(String[] args) {
        // create connection factory
        NoSqlEntityManagerFactory factory = FactorySingleton.createFactoryOnce();
        NoSqlEntityManager mgr = factory.createEntityManager();
        Event event = new Event();
        event.setName("Validation failed");
        Log log = new Log();
        log.setTime(new Date(System.currentTimeMillis()));
        // query
        Query query = mgr.createNamedQuery(Log.class, "findlog");
        List l = query.getResultList(0,100);
        System.out.println("Result Size: "+ l.size());
For partitioning query you have to defined managed query similarly

PARTITIONS e(:partitionId) select * FROM TABLE as e WHERE e.user = :user
Most of the example with Cassandra you will found on the com.alvazan.test package. At first glance the framework is very impressive with lot of unique features. For me it will be useful to reindex or create new index from existing data through map/reduce. This feature is in their up coming features list. I will be very happy to see the feature in next version.