Real time data processing with Storm, ETL from Oracle to Cassandra

Last couple of months we are using Apache Hadoop Map Reduce batch processing to analyze a huge amount of data. We have a few legacy product where we can't consider to using Cassandra big table database. A few of them uses Oracle Database as their primary storage. As our requirements we have to extract the data from the rdbms, parse the payload and load it to Cassandra for aggregating. Here i have decided to use Storm for real time data processing. Our usecase is as follows:
1) Storm spout connects to Oracle Database and collects data from particular table with some intervals.
2) Storm bolt parses the data with some fashion and emit to Storm-cassandra bolt to store the row into Cassandra DB.

Here is the fragment code of project. First i have create a Jdbc connector class, class contain a few class variables which contradicting with Storm ideology, as far i have just need one spout as input - it's enough for me.
package storm.contrib.jdbc;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;

public class JdbcConnection {
    private static Connection connection;
    private static final String JDBC_DRIVER = "oracle.jdbc.driver.OracleDriver";

    private JdbcConnection() {   }
    private static String jdbcUrl;
    private static String userName;
    private static String password;
    private static String query;
    private static String interval;
    private static final Logger logger = LoggerFactory.getLogger(JdbcConnection.class);

        Properties prop = new Properties();
        try {
            jdbcUrl = prop.getProperty("jdbc.url");
            userName = prop.getProperty("jdbc.username");
            password = prop.getProperty("jdbc.password");
            query = prop.getProperty("jdbc.query");
            interval = prop.getProperty("poll.interval");
        } catch (IOException e) {
    public static Connection getConnection() {
        if(connection != null ){
            return connection;
        try {
            connection = DriverManager.getConnection(getJdbcUrl(),getUserName(),getPassword());
        } catch (ClassNotFoundException e) {
            // throw the exception
        } catch(SQLException e){
        return connection;

    public static String getJdbcUrl() {
        return jdbcUrl;

    public static String getUserName() {
        return userName;

    public static String getPassword() {
        return password;

    public static String getQuery() {
        return query;

    public static String getInterval() {
        return interval;
Class JdbcConnection reads file from the classpath and initialize the connection.
Now we are ready to create our Oracle Spout
package storm.contrib.spout;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;

import backtype.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.contrib.jdbc.JdbcConnection;

import java.sql.*;
import java.util.*;

import static backtype.storm.utils.Utils.tuple;

public class OracleSpout implements IRichSpout{
    private SpoutOutputCollector collector;
    private TopologyContext context;
    private transient Connection connection;
    private boolean completed =false;
    private String query;
    private long interval;
    private static final Logger logger = LoggerFactory.getLogger(OracleSpout.class);
    private Fields outputFields;
    public OracleSpout(final Fields outputFields){
        this.outputFields = outputFields;
    public boolean isDistributed() {
        return false;

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    public Map<String, Object> getComponentConfiguration() {

        return null;
    // open connection to Oracle DB
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.collector = spoutOutputCollector;
        this.context = topologyContext;
        // connect to DB
        connection = JdbcConnection.getConnection();
        query = JdbcConnection.getQuery();
        interval = Long.valueOf(JdbcConnection.getInterval());
    public void close() {
        if(connection != null){
            try {
            } catch (SQLException e) {

    public void activate() {

    public void deactivate() {
    // read data and send to bolt
    public void nextTuple() {
        Statement stm = null;
        if(connection != null){
            List<Object> tupleVal = new ArrayList<Object>();
            try {
                //stm = connection.createStatement();
                stm = connection.prepareStatement(query,ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE);
                ResultSet rs = stm.executeQuery(query);
                //ResultSetMetaData rsmd = rs.getMetaData();
                    ResultSetMetaData rsmd = rs.getMetaData();

                    for(int col=1; col <= rsmd.getColumnCount(); col++){
                        tupleVal.add(getDataByCol(rs, rsmd.getColumnType(col), col));
                    // delete the row
                    completed = true;
            } catch (SQLException e) {
            }finally {
                    try {
                    } catch (SQLException e) {

    public void ack(Object o)   {
        // @todo delete the record
        //"ack:", o);

    public void fail(Object o) {"fail:", o);
    private SpoutOutputCollector getCollector() {
        return collector;

    private Object getDataByCol(ResultSet rs, int colType, int colIdx) throws SQLException{
        Object colData;
        switch (colType){
            case Types.CHAR:
            case Types.VARCHAR:
            case Types.CLOB:
                colData = rs.getString(colIdx);
            case Types.INTEGER:
            case Types.BIGINT:
            //case Types.:
                colData = rs.getLong(colIdx);
            case Types.DECIMAL:
            case Types.NUMERIC:
                colData = rs.getDouble(colIdx);
            case Types.DATE:
                colData = rs.getDate(colIdx);
            case Types.TIMESTAMP:
                colData = rs.getTimestamp(colIdx);
            case Types.BLOB:
                Blob blob = rs.getBlob(colIdx);
                InputStream is =  blob.getBinaryStream();
                colData = getBytes(is);
                //colData = rs.getBlob(colIdx);
                colData = rs.getString(colIdx);
        return colData;
    private byte[] getBytes(InputStream is){
        // Get the size of the file
        try {
            return IOUtils.toByteArray(is);
        } catch (IOException e) {
            return new byte[0];
The class is self explanatory itself. In open method we initialized the jdbc connection, in the nextTuple method we query the table and emit tuple for the parse bolt.
package storm.contrib.bolt;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.contrib.parse.ParseMessage;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static backtype.storm.utils.Utils.tuple;

public class ParseMsgBolt implements IRichBolt {
    private OutputCollector collector;
    private static final Logger logger = LoggerFactory.getLogger(ParseMsgBolt.class);
    TopologyContext context;

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        this.context = topologyContext;

    public void execute(Tuple tuple) {
        // parse
        Map<String, String> msgTuple = null;
        if(logMsg.length > 0){
            ParseSmevMessage3  parseMessage = new ParseSmevMessage3();
            List ls = new ArrayList();
            try {
                msgTuple =  parseMessage.parse(new String(logMsg));
                // add logid first;
                // add parsed fields values
            } catch (XMLStreamException e) {
                // send failure msg
        //send ack

    public void cleanup() {


    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("logid",
    public Map<String, Object> getComponentConfiguration() {
        return null;

package ru.atc.smev.pig.utils;

import com.ximpleware.*;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class ParseSmevMessage3 {
    private final static String CERT_TYPE = "X509";
    private static final String CERTSN = "certsn";
    private static final String ROOT_TAG = "root_tag";

    private static Map<String, String> SEARCH_TAGS = new HashMap<String, String>(17);
        SEARCH_TAGS.put("/*[local-name() = 'Envelope']/*[local-name() = 'Header']/*[local-name() = 'Security']/*[local-name() = 'BinarySecurityToken']/text()", CERTSN);
        //SEARCH_TAGS.put("/Envelope/root_tag", ROOT_TAG);
        SEARCH_TAGS.put("/*[local-name() = 'Envelope']/*[local-name() = 'Header']/*[local-name() = 'Header']/*[local-name() = 'MessageId']/text()", "msgid");

        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'Sender']/*[local-name() = 'Code']/text()", "asenc");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'Sender']/*[local-name() = 'Name']/text()", "asen");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'Recipient']/*[local-name() = 'Code']/text()", "arc");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'Recipient']/*[local-name() = 'Name']/text()", "arcn");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'Originator']/*[local-name() = 'Code']/text()", "origc");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'Originator']/*[local-name() = 'Name']/text()", "orign");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'Date']/text()", "ard");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'TypeCode']/text()", "tcode");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'ExchangeType']/text()", "exct");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'ServiceCode']/text()", "serc");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'RequestIdRef']/text()", "reqidr");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'RequestIdRef']/text()", "reqidr");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'OriginRequestIdRef']/text()", "origridr");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'CaseNumber']/text()", "can");

    private String getSerialNumber(final String cert) throws CertificateException {
        byte[] derFile = org.bouncycastle.util.encoders.Base64.decode(cert.getBytes());

        final CertificateFactory cf = CertificateFactory.getInstance(CERT_TYPE);
        final X509Certificate x509 = (X509Certificate) cf.generateCertificate(new ByteArrayInputStream(derFile));
        // get serial number in hex
        return x509.getSerialNumber().toString(10);
    public Map<String, String> parse (String xml) throws Exception{
        final Map<String, String> result = new HashMap<String, String>(17);
        final VTDGen vg = new VTDGen();
        final AutoPilot ap = new AutoPilot();
        int i;
        try {
            VTDNav vn = vg.getNav();
            for(String key : SEARCH_TAGS.keySet()){
                while( (i = ap.evalXPath())!=-1 ){
                    //System.out.println(SEARCH_TAGS.get(key)+":"+ vn.toString(i));
                        result.put(CERTSN, getSerialNumber(vn.toString(i)));
        } catch (XPathParseException e) {
        } catch(XPathEvalException e){
        } catch(NavException e){
        return result;
I have used vtd xml library to parse the file and emit it to cassandra bolt, which store the row to the Cassandra DB.
Here is the topology class:
package storm.contrib.topology;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.contrib.cassandra.bolt.BatchingCassandraBolt;
import backtype.storm.contrib.cassandra.bolt.CassandraBolt;
import backtype.storm.contrib.cassandra.bolt.DefaultBatchingCassandraBolt;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import storm.contrib.bolt.ParseMsgBolt;
import storm.contrib.spout.OracleSpout;

public class CassandraBoltTopology {
    public static void main(String[] args) {

        TopologyBuilder tBuilder = new TopologyBuilder();
        // define oracle table column name 
        tBuilder.setSpout("oracle-reader", new OracleSpout(new Fields("logobject_id",
        tBuilder.setBolt("Msgparser", new ParseMsgBolt()).shuffleGrouping("oracle-reader");
        tBuilder.setBolt("save to cassandra", new CassandraBolt("stormcf", "logid")).shuffleGrouping("Msgparser");
        Config config = new Config();
        config.put(CassandraBolt.CASSANDRA_HOST, "192.168.XXX.XXX");
        config.put(CassandraBolt.CASSANDRA_PORT, 9160);
        config.put(CassandraBolt.CASSANDRA_KEYSPACE, "stormks");
  //Topology run
        config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("Getting-Started-Toplogie", config, tBuilder.createTopology());
Here we first declare and define the column names of the table where we would collect the data. In the current storm version we could not define out put field dynamically. Next we set the bolt accordingly and configure the CassandraBolt. Project runs well on two machine cluster with any errors. Anyway i have a plan to add transaction feature in spout. Hope this will help someone to kick start in Storm. Here is the properties file:
jdbc.query=SELECT t.* FROM LOG_OBJECTS t



UnsatisfiedLinkError: JNA link failure on RHEL 5.7 with Cassandra 1.1.5

Today our team tried to install JNA 3.5.0 in our UAT environment. Here is the link to make a try. At the moment of Cassandra start we have noticed the following INFO on cassandra log:
INFO [main] 2012-11-27 13:20:17,747 (line 66) JNA link failure, one or more native method will be unavailable.
Very interesting thing is that, most of the JNA features works. I have decided to investigate the problem and restart Cassandra in debug mode (edit the and set the rootLogger level DEBUG) and found the details error
DEBUG [main] 2012-11-27 13:20:17,748 (line 67) JNA link failure details: /tmp/jna-oracle/jna1599621626582486116.tmp: /lib64/ version `GLIBC_2.11' not found (required by /tmp/jna-oracle/jna1599621626582486116.tmp)
Now it was easy to fix the problem. There is a few solution^
1) updated linux binaries for x86/amd64 against 2.1.3 and 2.2.5
2) Use JNA 3.3.0 or JNA 2.7.0 version
you can download JNA 3.3.0 from the following location
after successfully installation you should found the following INFO on log:
JNA mlockall successful
If you have another process along with Cassandra, you have to set ulimit for max locked memory to unlimited

Add the following lines in the /etc/security/limits.conf file for the user/group that runs Cassandra:

$USER soft memlock unlimited
$USER hard memlock unlimited

and reboot the system.


Patch pig_cassandra for setting ttl to cassandra data

Apache pig provides a platform for analyzing very large data set. With apache pig you can easily analyze your data from Cassandra. Apache pig compiles instruction to sequences of Map-Reduce programs which will run on Hadoop cluster. Cassandra source provides a simple pig script to run pig with Cassandra data. Cassandra also provides CassandraStorage class which will load and store data from Cassandra DB, this class will no built in support for storing data with TTL (time to live). In many cases you have to update a few columns or rows with ttl to delete later automatically from DB. For that, i have patched the CassandraStorage class and add the similar functionality. Here is the patch
Index: src/main/java/ru/atc/smev/cassandra/storage/
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
--- src/main/java/ru/atc/smev/cassandra/storage/ (revision 3711)
+++ src/main/java/ru/atc/smev/cassandra/storage/ (revision )
@@ -90,7 +90,7 @@
     private final static String PARTITION_FILTER_SIGNATURE = "cassandra.partition.filter";
     private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
-    private static final Log logger = LogFactory.getLog(CassandraStorage.class);
+    private static final Log logger = LogFactory.getLog(MyCassandraStorage.class);
     private ByteBuffer slice_start = BOUND;
     private ByteBuffer slice_end = BOUND;
@@ -113,6 +113,7 @@
     private Map<bytebuffer olumn="olumn"> lastRow;
     private boolean hasNext = true;
+    private int ttl;
     public CassandraStorage()
@@ -131,8 +132,13 @@
     public int getLimit()
         return limit;
+    public int getTtl() {
+        return ttl;
+    }
     public Tuple getNextWide() throws IOException
         CfDef cfDef = getCfDef(loadSignature);
@@ -337,14 +343,14 @@
     private CfDef getCfDef(String signature)
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
+        Properties property = context.getUDFProperties(MyCassandraStorage.class);
         return cfdefFromString(property.getProperty(signature));
     private List<indexexpression> getIndexExpressions()
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
+        Properties property = context.getUDFProperties(MyCassandraStorage.class);
         if (property.getProperty(PARTITION_FILTER_SIGNATURE) != null)
             return indexExpressionsFromString(property.getProperty(PARTITION_FILTER_SIGNATURE));
@@ -462,6 +468,8 @@
                     slice_reverse = Boolean.parseBoolean(urlQuery.get("reversed"));
                 if (urlQuery.containsKey("limit"))
                     limit = Integer.parseInt(urlQuery.get("limit"));
+                if(urlQuery.containsKey("ttl"))
+                    ttl = Integer.parseInt(urlQuery.get("ttl"));
             String[] parts = urlParts[0].split("/+");
             keyspace = parts[1];
@@ -469,7 +477,7 @@
         catch (Exception e)
-            throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1]]': " + e.getMessage());
+            throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1][&ttl=86400]]': " + e.getMessage());
@@ -694,7 +702,7 @@
     public void setPartitionFilter(Expression partitionFilter)
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
+        Properties property = context.getUDFProperties(MyCassandraStorage.class);
         property.setProperty(PARTITION_FILTER_SIGNATURE, indexExpressionsToString(filterToIndexExpressions(partitionFilter)));
@@ -901,6 +909,11 @@
+            if(getTtl() != 0){
+                column.setTtl(getTtl());
+                column.setTtlIsSet(true);
+            }
             mutation.column_or_supercolumn = new ColumnOrSuperColumn();
             mutation.column_or_supercolumn.column = column;
@@ -924,6 +937,11 @@
+                    if(getTtl() != 0){
+                        column.setTtl(getTtl());
+                        column.setTtlIsSet(true);
+                    }
                 if (columns.isEmpty())
@@ -980,7 +998,7 @@
     private void initSchema(String signature)
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
+        Properties property = context.getUDFProperties(MyCassandraStorage.class);
         // Only get the schema if we haven't already gotten it
         if (!property.containsKey(signature))
You can build the Cassandra with the above patch and using it on your pig script as follows:
STORE updated INTO 'cassandra://audit/auditlog?ttl=86400' USING CassandraStorage();
Or you can use the new class named MyCassandraStorage as a pig UDF function. First you have to compile and archive the class in jar. Later you can define the class in your pig script and use as follows:
DEFINE ParseSmevMessage;
raw = LOAD 'cassandra://audit/auditlog' USING MyCassandraStorage();
filtered = FILTER raw BY  processed.$1=='N';
updated = FOREACH filtered GENERATE  key,TOTUPLE('processed','Y');
STORE updated INTO 'cassandra://audit/auditlog?ttl=86400' USING MyCassandraStorage();


Another Cassandra data manipulation api - PlayOrm

Recently i have found one interesting project on Github named PlayOrm, which features very impress me. I have decided to just play with it. Lets first check out there features list:

  1. Just added support for Entity has a Cursor instead of List which is lazy read to prevent out of memory on VERY wide rows
  2. PlayOrm Queries use way less resources from cassandra cluster than CQL queries
  3. Scalabla JQL(SJQL) supported which is modified JQL that scales(SQL doesn't scale well)
  4. Partitioning so you can query a one trillion row table in just ms with SJQL(Scalable Java Query Language)
  5. Typical query support of <=, <, >, >= and = and no limitations here
  6. Typical query support of AND and OR as well as parenthesis
  7. Inner Join support (Must keep your very very large tables partitioned so you get very fast access times here)
  8. Left Outer Join support
  9. Return Database cursor on query
  10. OneToMany, ManyToMany, OneToOne, and ManyToOne but the ToMany's are nosql fashion not like RDBMS
  11. support of a findAll(Class c, List keys) as is typical in nosql to parallel the reads
  12. Inheritance class heirarchy in one table is supported like hibernate
  13. flush() support - We protect you from failures!!!
  14. first level read cache
  15. Automatically creates ColumnFamilies at runtime
  16. Includes it's own in-memory database for TDD in your unit tests!!!!!
  17. Saves you MORE data storage compared to other solutionst
  18. logging interface below the first level cache so you can see the raw
    operations on cassandra and optimize just like when you use hibernate's
  19. A raw interface using only BigDecimal, BigInteger, and String types
    which is currently used to upload user defined datasets through a web
    interface(and we wire that into generating meta data so they can ad-hoc
    query on the nosql system)
  20. An ad-hoc query interface that can query on any table that was from
    an Entity object. To us on other tables, you can also code up and save
    DboTableMeta objects and the ad-hoc query interface gets you query
    support into those tables
  21. IF you have some noSQL data and some Relational data, store your
    relational data in noSQL now and just maintain one database in
  22. support for joda-time LocalDateTime, LocalDate, LocalTime which
    works way better than java's Date object and is less buggy than java's
    Date and Calendar objects
  23. Command Line tool.  
Impressive yah )) Feature 4 can Partitioning can replace Cassandra composite primary key feature. What i gave done - just clone the project from the git hub. Import the project in IntelliJ idea and start coding.

First i made a try to feature Inner Join.

1) Start my local Cassandra data base.
2) Create an Keyspace named MyKeyspace through CQL as follows:

CREATE KEYSPACE MyKeyspace WITH strategy_class='SimpleStrategy'
 AND strategy_options:replication_factor=1;
3) Create two simple java Pojo with PlayOrm annotations:
Entity log  - one to one relation with Entity event
@NoSqlQuery(name="findlog", query="select *  FROM Log as l INNER JOIN l.event as ee where l.user=:user")
public class Log {
    private int id;
    //private String
    private String msg;
    private String user;
    private Date   time;
    private Event event;
    public int getId() {
        return id;
    public void setId(int id) { = id;
    public String getMsg() {
        return msg;
    public void setMsg(String msg) {
        this.msg = msg;
    public String getUser() {
        return user;
    public void setUser(String user) {
        this.user = user;
    public Date getTime() {
        return time;
    public void setTime(Date time) {
        this.time = time;
    public Event getEvent() {
        return event;
    public void setEvent(Event event) {
        this.event = event;
Entity Event 

import com.alvazan.orm.api.base.anno.NoSqlEntity;
import com.alvazan.orm.api.base.anno.NoSqlId;
import com.alvazan.orm.api.base.anno.NoSqlIndexed;
public class Event {
    private int id;
    private String code;
    private String name;
    //private Log log;
    public int getId() {
        return id;
    public void setId(int id) { = id;
    public String getCode() {
        return code;
    public void setCode(String code) {
        this.code = code;
    public String getName() {
        return name;
    public void setName(String name) { = name;
for quick start better to use PlayOrm FactorySingleton which you can found it the test package
package com.alvazan.test;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alvazan.orm.api.base.Bootstrap;
import com.alvazan.orm.api.base.DbTypeEnum;
import com.alvazan.orm.api.base.NoSqlEntityManagerFactory;
public class FactorySingleton {
private static final Logger log = LoggerFactory.getLogger(FactorySingleton.class);
private static NoSqlEntityManagerFactory factory;

public static Config getConfigForAllTests() {
String clusterName = "Test Cluster";
//DbTypeEnum serverType = DbTypeEnum.IN_MEMORY;
    DbTypeEnum serverType = DbTypeEnum.CASSANDRA;
String seeds = "localhost:9160";

return new Config(serverType, clusterName, seeds);

public synchronized static NoSqlEntityManagerFactory createFactoryOnce() {
if(factory == null) {
Config config = getConfigForAllTests();
//We used this below commented out seeds to test our suite on a cluster of 6 nodes to see if any issues pop up with more
//nodes using the default astyanax consistency levels which I believe for writes and reads are both QOURUM
//which is perfect for us as we know we will get the latest results
//String seeds = ",,";
Map<string object="object"< props = new HashMap<string object="object">();
factory = createFactory(config, props);
return factory;

public static NoSqlEntityManagerFactory createFactory(Config config, Map<string object="object"> props) {"CREATING FACTORY FOR TESTS");
props.put(Bootstrap.AUTO_CREATE_KEY, "create");
switch (config.getServerType()) {
//nothing to do
Bootstrap.createAndAddBestCassandraConfiguration(props, config.getClusterName(), "MyKeyspace", config.getSeeds());
throw new UnsupportedOperationException("not supported yet, server type="+config.getServerType());

NoSqlEntityManagerFactory factory = Bootstrap.create(config.getServerType(), props, null, null);
return factory;
Now it's time to put some data on Cassandra and write Managed query

package com.alvazan.test;

import com.alvazan.orm.api.base.NoSqlEntityManager;
import com.alvazan.orm.api.base.NoSqlEntityManagerFactory;
import com.alvazan.orm.api.base.Query;
import com.alvazan.test.db.Email;
import com.alvazan.test.db.User;
import com.alvazan.test.mytest.Event;
import com.alvazan.test.mytest.Log;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
public class BasicTest {
    public static void main(String[] args) {
        // create connection factory
        NoSqlEntityManagerFactory factory = FactorySingleton.createFactoryOnce();
        NoSqlEntityManager mgr = factory.createEntityManager();
        Event event = new Event();
        event.setName("Validation failed");
        Log log = new Log();
        log.setTime(new Date(System.currentTimeMillis()));
        // query
        Query query = mgr.createNamedQuery(Log.class, "findlog");
        List l = query.getResultList(0,100);
        System.out.println("Result Size: "+ l.size());
For partitioning query you have to defined managed query similarly

PARTITIONS e(:partitionId) select * FROM TABLE as e WHERE e.user = :user
Most of the example with Cassandra you will found on the com.alvazan.test package. At first glance the framework is very impressive with lot of unique features. For me it will be useful to reindex or create new index from existing data through map/reduce. This feature is in their up coming features list. I will be very happy to see the feature in next version.

A single node Hadoop + Cassandra + Pig setup

UP1: Our book High Performace in-memory computing with Apache Ignite has been released. The book briefly described how to improved performance in existing legacy Hadoop cluster with Ignite and achieve ACID transaction over Cassandra through Apache Ignite.

In our current project we have decided to store all operational logs in to NoSQL DB. It's total volume about 97 TB per year. Cassandra was our main candidate to use as NoSQL DB. But we also have to analysis and monitor our data, where comes Hadoop and Pig to help. Within 2 days our team able to developed a simple pilot projects to demonstrate all the power of Hadoop + Cassandra and Pig.
For the pilot project we used DataStax Enterprise edition. Seems this out of box product help us to quick install hadoop, cassandra stack and developed our pilot project. Here we made decision to setup Hadoop, cassandra and Pig by our self. It's my first attempt to install Cassandra over hadoop and Pig. Seems all this above products already running already a few years, but i haven't found any step step by tutorial to setup a single node cluster with Hadoop + Cassandra + pig.
First of all we are going to install Hadoop and Cassandra, therefore, will try to run pig_cassandra Map only job over Cassandra column family which will save the result on Hadoop HDFS file system.
Setup Hadoop:
1) Download hadoop from the following link - then un archive the file
tar -xvf hadoop-0.20.2.tar.gz
rm hadoop-0.20.2.tar.gz
cd hadoop-0.20.2
2) Edit /conf/core-site.xml. I have used localhost in the value of

3) Edit /conf/mapred-site.xml.

4) Edit /conf/hdfs-site.xml. Since this test cluster has a single node, replication factor should be set to 1.

5) Set your JAVA_HOME variable in /conf/ If you already have the JAVA_HOME variable in your .bash_profile - it's redundant.
6) Format the name node (one per install).
$ bin/hadoop namenode -format
it should print out the following message
12/07/15 15:54:20 INFO namenode.NameNode: STARTUP_MSG: 
STARTUP_MSG: Starting NameNode
STARTUP_MSG:   host = Shamim-2.local/
STARTUP_MSG:   args = [-format]
STARTUP_MSG:   version = 0.20.2
STARTUP_MSG:   build = -r 911707; compiled by 'chrisdo' on Fri Feb 19 08:07:34 UTC 2010
12/07/15 15:54:21 INFO namenode.FSNamesystem: fsOwner=samim,staff,,everyone,_appstore,localaccounts,_appserverusr,admin,_appserveradm,_lpadmin,_lpoperator,_developer,
12/07/15 15:54:21 INFO namenode.FSNamesystem: supergroup=supergroup
12/07/15 15:54:21 INFO namenode.FSNamesystem: isPermissionEnabled=true
12/07/15 15:54:21 INFO common.Storage: Image file of size 95 saved in 0 seconds.
12/07/15 15:54:21 INFO common.Storage: Storage directory /tmp/hadoop-samim/dfs/name has been successfully formatted.
12/07/15 15:54:21 INFO namenode.NameNode: SHUTDOWN_MSG: 
SHUTDOWN_MSG: Shutting down NameNode at Shamim-2.local/
6.1) set up passphraseless ssh
Check that you can login into localhost without passphrase
ssh localhost
if you cannot than first enable your ssh server
system preferences-> sharing-> check the box for remote loging, also you can allow access for all user
then execute the following commands
$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
$ cat ~/.ssh/ >> ~/.ssh/authorized_keys
7) Start all hadoop components
$ bin/ start namenode
$ bin/ start jobtracker
$ bin/ start datanode
$ bin/ start tasktracker
$ bin/ start secondarynamenode
starting namenode, logging to /Users/samim/Development/NoSQL/hadoop/core/hadoop-0.20.2/bin/../logs/hadoop-samim-namenode-Shamim-2.local.out
starting jobtracker, logging to /Users/samim/Development/NoSQL/hadoop/core/hadoop-0.20.2/bin/../logs/hadoop-samim-jobtracker-Shamim-2.local.out
starting datanode, logging to /Users/samim/Development/NoSQL/hadoop/core/hadoop-0.20.2/bin/../logs/hadoop-samim-datanode-Shamim-2.local.out
you can check all the log file to make sure that everything goes well.
8) Verify the NameNode and DataNodes communication through web interface. http://localhost:50070/dfshealth.jsp Check the page and confirm that you have one Live node
9) Verify that the JobTracker and TaskTrackers are communicating by looking at the JobTracker web interface and confirming one node listed in the Nodes column: http://localhost:50030/jobtracker.jsp
10) Use the hadoop command-line tool to test the file system:
$ hadoop dfs -ls /
$ hadoop dfs -mkdir /test_dir
$ echo "A few words to test" > /tmp/myfile
$ hadoop dfs -copyFromLocal /tmp/myfile /test_dir
$ hadoop dfs -cat /test_dir/myfile
A few words to test
Setup Cassandra: 1) Download the source code for cassandra verion 1.1.2 from the following link assume you know how to build the cassandra from the source code, otherwise you will find a lot of information though google to build cassandra from the source code.
2) Edit CASSANDRA_HOME/conf/cassandra.yaml file to set the listen_address and rpc_address to localhost.
3) Start cassandra $ cassandra/bin/ ./cassandra 4) Check the cluster through node tool utility
$ /bin ./nodetool -h localhost ring
Note: Ownership information does not include topology, please specify a keyspace. 
Address         DC          Rack        Status State   Load            Owns                Token                                     datacenter1 rack1       Up     Normal  55.17 KB        100.00%         96217188464178957452903952331500076192  
Cassandra cluster starts up, now we are going to configure pig
Setup Pig: 1) Download pig from the apache site as follows tar -xvf pig-0.8.0.tar.gz rm pig-0.8.0.tar.gz At this moment we will try to run the pig_cassandra example which you can find with the source distribution. First of all it's better to read the README.TXT file from the folder apache-cassandra-1.1.2-src/examples/pig/README.txt Set all the env variables describes in the readme.txt file as follows:
export PIG_INITIAL_ADDRESS=localhost
export PIG_RPC_PORT=9160
export PIG_PARTITIONER=org.apache.cassandra.dht.RandomPartitioner
Also if you would like to run using the Hadoop backend, you should also set PIG_CONF_DIR to the location of your Hadoop config. In my cases
export PIG_CONF_DIR=hadoop/core/hadoop-0.20.2/conf
In this stage you can run grunt shell to run map reduce task, run examples/pig$ bin/pig_cassandra -x local it should prompt grunt shell, but i have got the following clssnofound exception: java.lang.ClassNotFoundException: org.apache.hadoop.mapred.RunningJob For quick fix, i decide to edit the pig_cassandra file as follows:
export HADOOP_CLASSPATH="/Users/xyz/hadoop/core/hadoop-0.20.2/hadoop-0.20.2-core.jar"
While i got the grunt shell, i create a keyspace and one column family in cassandra cluster and insert some value through cassandra-cli
[default@unknown] create keyspace Keyspace1;
  [default@unknown] use Keyspace1;
  [default@Keyspace1] create column family Users with comparator=UTF8Type and default_validation_class=UTF8Type and key_validation_class=UTF8Type;
  [default@KS1] set Users[jsmith][first] = 'John';
  [default@KS1] set Users[jsmith][last] = 'Smith';
  [default@KS1] set Users[jsmith][age] = long(42)
then i run following pig query in grunt shell
grunt> rows = LOAD 'cassandra://Keyspace1/Users' USING CassandraStorage() AS (key, columns: bag {T: tuple(name, value)});
grunt> cols = FOREACH rows GENERATE flatten(columns);
grunt> colnames = FOREACH cols GENERATE $0;
grunt> namegroups = GROUP colnames BY (chararray) $0;
grunt> namecounts = FOREACH namegroups GENERATE COUNT($1), group;
grunt> orderednames = ORDER namecounts BY $0;
grunt> topnames = LIMIT orderednames 50;
grunt> dump topnames;
Pig run the script and here is the statistics:
2012-07-15 17:29:35,878 [main] INFO - Detected Local mode. Stats reported below may be incomplete
2012-07-15 17:29:35,881 [main] INFO - Script Statistics: 

HadoopVersion PigVersion UserId StartedAt FinishedAt Features
0.20.2 0.8.3 samim 2012-07-15 17:29:14 2012-07-15 17:29:35 GROUP_BY,ORDER_BY,LIMIT


Job Stats (time in seconds):
JobId Alias Feature Outputs
job_local_0001 colnames,cols,namecounts,namegroups,rows GROUP_BY,COMBINER 
job_local_0002 orderednames SAMPLER 
job_local_0003 orderednames ORDER_BY,COMBINER file:/tmp/temp-833597378/tmp-220576755,

Successfully read records from: "cassandra://Keyspace1/Users"

Successfully stored records in: "file:/tmp/temp-833597378/tmp-220576755"

Job DAG:
job_local_0001 -> job_local_0002,
job_local_0002 -> job_local_0003,

2012-07-15 17:29:35,881 [main] INFO  org.apache.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
2012-07-15 17:29:35,886 [main] INFO  org.apache.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
2012-07-15 17:29:35,887 [main] INFO  org.apache.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
2012-07-15 17:29:35,888 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
2012-07-15 17:29:35,904 [main] INFO  org.apache.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
2012-07-15 17:29:35,907 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
2012-07-15 17:29:35,907 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
You should found the output file in the hadoop file system tmp. In my case its
If you would like to run the example-script.pig, you would have to create one KeySpace name MyKeySpace and column family according to the pig script. I just edit the example-script.pig and set the newly created keyspace1 and column family Users. Then you can run it like this:
examples/pig$ bin/pig_cassandra example-script.pig
If you want to run the pig in local mode, add the following predicates -x local. For example pig_cassandra -x local example-script.pig. Without the instruction -x local, pig will run on Hadoop mode. See here for more information. Thank'x Nabanita to point out this moment.

see the statistics in the console. My next step is to set up cassandra cluster with 4 nodes over Hadoop and run Map reduce all over the cluster nodes.
1) Cassandra high performance cook book.
2) Cassandra definitive guide.


Oracle Enterprise gateway load test

Last few weeks we have done a few exercises with OEG, develop a lot of filters, asynchronous delivery with JMS and much more. Last week we decided to do some load test, specially how many web service we can register in OEG. We develop jython script to register web services and assign policy in web services. However it's very tough to collect a big amount of web services online - we decided to register on web service in separate web service group, because you can register two similar web service in one group. The Jython script is as follows:
Register WSDL group in Gateway

from java.util import ArrayList
from deploy import DeployAPI
from esapi import EntityStoreAPI
from import AddWebServiceServletProcessor
from vtrace import Tracer
import common
import datetime

t = Tracer(Tracer.INFO) # Set trace to info level
gw_deployURL = "http://localhost:8090/configuration/deployments/DeploymentService"
dep = DeployAPI(gw_deployURL, common.defUserName, common.defPassword)
es = dep.getStore('')

t0 =
print t0
wsdlURL = ''
deployLocation = es.get('/[NetService]name=Service/[HTTP]name=Default Services')
defaultWsGroup = es.get('/[WebServiceRepository]name=Web Service Repository/[WebServiceGroup]name=Web Services')"---------------------------------")
# add new group on default webservice group
aws = AddWebServiceServletProcessor(
deploymentPks = ArrayList()
for i in range(1,100):
 try:"Count Service :"+ str(i))
  wsGroup = es.addNewEntity(defaultWsGroup,"WebServiceGroup",{"name":"test"+str(i)})
  aws.addWebService(wsGroup, wsdlURL, deploymentPks, 60)
  # add policy Request and response
  #Add sample policy to user interception point.
  #Get generated entites for the web-service
  entities = ArrayList()
  entities = aws.getCreatedWebServiceEntities()
  en = entities[0]"---------------------------------")"Name: "+en.getStringValue("name")+"\n")
  #Looking for generated circuit for the web-service by service name
  policy = es.get('/[CircuitContainer]name=Generated Policies/[CircuitContainer]name=Web Services.'+'test'+str(i)+'.'+en.getStringValue("name")+'/[FilterCircuit]/[WSFilter]/[OperationCircuitChain]name=Request From Front End')"---------------------------------")"Name: "+policy.getStringValue("name")+"\n")
  #Attach sample policy to generated circuit
  #Modify the path to ur policy here
  pk = es.get('/[CircuitContainer]name=Policy Library/Modify the path to ur policy here');
  # -------- add Response handler ---------
  #Looking for generated circuit for the web-service by service name [Response handler]
  policyResponse = es.get('/[CircuitContainer]name=Generated Policies/[CircuitContainer]name=Web Services.'+'test'+str(i)+'.'+en.getStringValue("name")+'/[FilterCircuit]/[WSFilter]/[OperationCircuitChain]name=Response From Back End')"---------------------------------")"Name: "+policyResponse.getStringValue("name")+"\n")
  pkResponse = es.get('/[CircuitContainer]name=Policy Library/Modify the path to ur policy here');
  #Write updated policy to store"---- update request policy-----")
  es.updateEntity(policy)"---- update response policy-----")
 except Exception, err:
  print "something wrong with registration!" 
  print "error:"+ str(err)

#Deploy new configuration"---- update entity -----")
res = dep.setStore(es)"---- Web services registered-----")
if res.getStatus() != True:
    t.error("Failed to deploy: " + res.getFailureReason())
    #t.error("Failures: "+ Integer.toString(res.getErrorCount())) 
# close entity Store  
delta_t = - t0 
print delta_t"end of commands...")
The above script will register 99 web services in one transaction. run the script from the directory %OEG_HOME%\samples\scripts\run.bat For load test we use Oracle Linux machine with 2 cpu, 6 Gb RAM. For better performance we increased heap size for the OEG to 4 gb in the jvm.xml as follows:
    <vmarg name="-Xmx4000m"/>
    <vmarg name="-Xms4000m"/>
    <vmarg name="-XX:PermSize=128m"/>
    <vmarg name="-XX:MaxPermSize=128m"/>
    <vmarg name="-XX:+UseConcMarkSweepGC"/>
Here is the Log of all step: Register 100 service - elapsed time 6 min - memory in use after registration 120 MB Register 200 service - elapsed time 9 min - memory in use after registration 280 MB Register 300 service - elapsed time 17 min - memory in use after registration 470 MB Restart OEG - memory in use 200 MB Register 400 service - elapsed time 27 min - memory in use after registration 900 MB Register 450 service - elapsed time 13 min - memory in use after registration 1400 MB Register 500 service - elapsed time 21 min - memory in use after registration 1700 MB Register 520 service - elapsed time 14 min - memory in use after registration 1900 MB Register 540 service - elapsed time 16 min - memory in use after registration 2100 MB After 540 service registration, OEG start crush with java.lang.OutOfMemoryError: Java heap space. Within registration OEG always replace old configuration file with new configuration file, but OEG could not release the allocated memory. After 540 service any deployment to OEG failed with crush. It indicate memory leak ((. In the time of deployment OEG always allocated double of it's uses memory - it means if OEG uses 500 MB memory - in the time of deployment it will need 1gb of memory. Here is the stack trace:
Caused by: java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOfRange(
        at java.lang.String.<init>(
        at java.lang.StringBuffer.toString(
We also use JvisualVM to profilling the JVM. Here is an illustration from my local machine
The result of the load is that, OEG contain memory leak in it, also if you want to register a big amount web services you have to need much more memory to healthy your heap of OEG.


Register webservices wsdl in OEG through script

In my previous Post blogger Mark O'Neill pointed me to check new version of OEG, which has been shipped with sample scripts. Last weekend i decided to spent some time to digging out these samples. The following a list of sample scripts which ship with the Gateway: - analyse Perform various analysis on configuration - certs Examples working with certificates - io Exporting and importing configuration - publish Publishing new types to the configuration - upgrade Upgrading older versions of configuration - users Examples working with users - ws Working with Webservices and WSDLs It's pretty easy yo run these scripts, for example sh ws\ will show the list of the registered web service. Similarly will register web service wsdl into OEG. I have simply modify the script to register web services wsdl from file, where file contain a list of web service wsdl as follows:
and here is the modified jython script:
Register WSDL in Gateway

from java.util import ArrayList
from deploy import DeployAPI
from esapi import EntityStoreAPI
from import AddWebServiceServletProcessor
from vtrace import Tracer
import common

t = Tracer(Tracer.INFO) # Set trace to info level

dep = DeployAPI(common.gw_deployURL, common.defUserName, common.defPassword)
es = dep.getStore('')

deployLocation = es.get('/[NetService]name=Service/[HTTP]name=Default Services')
wsGroup = es.get('/[WebServiceRepository]name=Web Service Repository/[WebServiceGroup]name=Web Services')

aws = AddWebServiceServletProcessor(
file = open("wsdls.txt", "r")
for line in file.readlines():
 print line
 deploymentPks = ArrayList()
 aws.addWebService(wsGroup, line.strip(), deploymentPks, 60)
 res = dep.setStore(es)
 if res.getStatus() != True:
     t.error("Failed to deploy: " + res.getFailureReason())
     t.error("Failures: "+ Integer.toString(res.getErrorCount())) 


Next step is to register web service and assign some global policy on them. P.S. Thank's to Mark O'Neill for pointing me about new OEG version


Manipulating Oracle Gateway Entity Store with gateway SDK

Oracle Enterprise Gateway (OEG) is built in gateway product from company Vordel to simplify and secure SOA deployments. OEG replaces Oracle web service manager functionality for SOA development. In real life, most of all time we have a lot of services to registered in OWSM or in OEG. Even more, it was not possible to migrate registered services from one node to another on OWSM. When we got plan to migrate from OWSM to OEG, our main aim was to register web services automatically through API. I was very happy, when found OEG provides some SDK to working with registry. Here is the first attempt to working with OEG SDK. We will use maven to build our project. OEG entity store consolidate all the entities and objects uses in the repository, for example all the registered services and policies. Through Entity store you can add, update and delete any entity. In OTN you can find one tutorial to develop a custom policy through OEG SDK. OEG also provide entity explorer to working with entity store, %OEG_HOME%\Win32\bin\esexplorer.bat First we will install all the necessary libraries on maven local repository.
set OEG_HOME=c:/OEG/gateway/system

call mvn install:install-file -DgroupId=com.vordel -DartifactId=client -Dversion=1.0 -Dfile=%OEG_HOME%/lib/client.jar -Dpackaging=jar -DgeneratePom=true
call mvn install:install-file -DgroupId=com.vordel -DartifactId=common -Dversion=1.0 -Dfile=%OEG_HOME%/lib/common.jar -Dpackaging=jar -DgeneratePom=true
call mvn install:install-file -DgroupId=com.vordel -DartifactId=entityStore -Dversion=1.0 -Dfile=%OEG_HOME%/lib/entityStore.jar -Dpackaging=jar -DgeneratePom=true
call mvn install:install-file -DgroupId=org.apache.axis -DartifactId=axis -Dversion=1.0 -Dfile=%OEG_HOME%/lib/modules/axis.jar -Dpackaging=jar -DgeneratePom=true
call mvn install:install-file -DgroupId=org.apache.commons -DartifactId=commons-discovery -Dversion=0.2 -Dfile=%OEG_HOME%/lib/modules/commons-discovery-0.2.jar -Dpackaging=jar -DgeneratePom=true
call mvn install:install-file -DgroupId=org.apache.commons -DartifactId=commons-logging -Dversion=1.0.4 -Dfile=%OEG_HOME%/lib/modules/commons-logging-1.0.4.jar -Dpackaging=jar -DgeneratePom=true
call mvn install:install-file -DgroupId=javax.xml.rpc -DartifactId=jaxrpc -Dversion=1.0 -Dfile=%OEG_HOME%/lib/modules/jaxrpc.jar -Dpackaging=jar -DgeneratePom=true
call mvn install:install-file -DgroupId=javax.wsdl -DartifactId=javax.wsdl_api -Dversion=1.6.2 -Dfile=%OEG_HOME%/lib/plugins/javax.wsdl_api_1.6.2.jar -Dpackaging=jar -DgeneratePom=true
rem add also Win32\lib\vjni.lib in your classpath
After running the above script, we will got all the libraries on our maven local repository. For working correctly, we also have to add vjni.dll file on our classpath. For windows we can do it through PATH variable, for linux platform we can use LD_LIBRARY_PATH. Next we have to create a maven project and add all the dependencies on it as follows:
Now we are ready to write some code and manipulate with OEG entity store. Here is our some pseudo code to manipulate entity.
public class EntityStoreFactory {
    private EntityStoreFactory(){};
    public static EntityStore getEntityStore(final String URL, final String userName, final String password) throws EntityStoreException{ efs =;
        efs.registerForURL(URL,, new ArrayList<string>(2){{add(userName); add(password);}});
        return efs.getEntityStoreForURL(URL);
For testing purpose we are using SOAP provider to connect, but it's possible to use another type of provide such as LDAP, DB e.t.c. You can find the configuration file for the entity store factory in the following place %OEG_HOME%\gateway\system\conf\esproviders.xml .Even you can initialize provider as follows^
efs.initializeProviders(new FileInputStream("%OEG_HOME%\\enterprisegateway\\system\\conf\\esproviders.xml"));
through getEntityStore we can get the entityStore and ready to manipulate entity.
public class AddEntity {
    private static Properties props = new Properties();

    public static void main(String[] args) {
        final String url = "http://localhost:8090/configuration/policies";

        System.out.println("Test Vordel ES api");
        EntityStore es = null;
        try {
            es = EntityStoreFactory.getEntityStore(url,"admin","changeme");
            es.connect(url, props);
            // test connection
            ESPK rootESPK =  es.getRootPK();
            System.out.println("Root:"+ rootESPK.toString());
            /** Get the example webservice entity for simplicity, assume we have one web service registered **/
            // get the web services
            ESPK espkBRs = new ESStringPK("DEFAULT_PRIMARY_OracleGateway_6.1.2:1668"); // added through service manager
   entBrs = es.getEntity(espkBRs);
            // add test entity
            ESPK parentPK = new ESStringPK("DEFAULT_PRIMARY_OracleGateway_6.1.2:147");
   newEntity = new; // webservice type
            // add more fields
            // add parent key
            newEntity.setStringField("name", "testby-api");
            Value primaryWsdlValue = new Value();
            newEntity.setReferenceField("primaryWSDL", new ESStringPK("DEFAULT_PRIMARY_OracleGateway_6.1.2:1670"));

            ESPK newAddEspk = es.addEntity(parentPK, newEntity);
            System.out.println("NewEspk:"+ newAddEspk);
            // add more element on it - ex wsdl
            EntityType wsdlEType = new EntityTypeImpl(es, "");
   wsdlEntity = new;
            wsdlEntity.setStringField("uri", "http://localhost:9000/StockQuote?WSDL");
            wsdlEntity.setStringField("wsdl", "http://localhost:9000/StockQuote?WSDL");
            es.addEntity(newAddEspk, wsdlEntity);
            // add Generated circuits
            // get Circuite container for example
            ESPK parentCircuiteCPK = new ESStringPK("DEFAULT_PRIMARY_OracleGateway_6.1.2:1655");
   newCCType = new; // Contained type
            ESPK newCCForTest = es.addEntity(parentCircuiteCPK, newCCType);
            // add filter circuit
        } catch (EntityStoreException e) {
        } finally {
                try {
                } catch (EntityStoreException e) {
Now we can use entity explorer to check our newly created web service entity.
Our newly created web service is not fully configured yet, we have to add circuit policy on it. Next post will describe how to add circuit policy briefly. Happy coding))


Manage application configuration with JMX in JBOSS application server

Most of the time developers likes manage their application configuration on separate file, which contains name value pair. In my current project one of my team member also implements such a configuration through Spring. Put the file on Jboss %jboss-as%/server/xyz/conf folder which will picked by the spring on the startup of the server. I have asked him, what should i do to change the value of the configuration. He replied, you have to change the file and restart the server or start and stop the application ))). Certainly we face a lof of times these type of use cases. I told him about JMX and decided to make some quick change on code. Todays post is about JMX. For more information about JMX use cases, check the following links JMX use cases. At first we will create one interface and his implements, which will be our resource to manage by JMX. here is the fragment code of the classes:
public interface ConfigMBean {
    public void setURL(String url);
    public String getURL();

    public void setUserName(String useName);
    public String getUserName();

    public void setPassword(String password);
    //public String getPassword();

    public void setDownloadTimeout(long timeout);
    public long getDownloadTimeout();
Implemention of the interface goes here:
public class ConfigImpl implements ConfigMBean {
    private String url;
    private String userName;
    private String password;
    private long   downloadTimeout;

    public void setURL(String url) {
        this.url = url;

    public String getURL() {
        return url;

    public void setUserName(String useName) {
        this.userName = useName;

    public String getUserName() {
        return userName;

    public void setPassword(String password) {
        this.password = password;

    public void setDownloadTimeout(long timeout) {
        this.downloadTimeout = timeout;

    public long getDownloadTimeout() {
        return downloadTimeout;
Now it's time to configure spring context to initialize our beans.
<beans xmlns=""

    <import resource="classpath:META-INF/cxf/cxf.xml"/>
    <import resource="classpath:META-INF/cxf/cxf-servlet.xml"/>
    <import resource="classpath:META-INF/cxf/osgi/cxf-extension-osgi.xml"/>

    <context:component-scan base-package="com.blu"/>

    <bean name="placeholderConfig" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="ignoreUnresolvablePlaceholders" value="true"/>
        <property name="locations">

    <bean id="ConfigJMX" class="xyz.config.ConfigImpl">
        <property name="URL" value="${BasePath}"/>
        <property name="userName" value="${Login}"/>
        <property name="password" value="${Password}"/>
        <property name="downloadTimeout" value="${DownloadTimeout}"/>
    <!-- JMX config-->
    <bean id="exporter" class="org.springframework.jmx.export.MBeanExporter" lazy-init="false">
        <property name="beans">
                <entry key="xyz:name=JConfig" value-ref="ConfigJMX"/>

Code snippet is self explainable, however, bean named placeholderConfig used for reading name value pair from the file system when application starts. Configuration file is as follows:
Through spring Mbean exporter we exposes our pojo ConfigImpl to MBean server, and that's it. We can also use annotation @ManagedResource to expose pojo as a Mbean and auto wire by annotation @Component. Now we can deploy our application on Jboss and our Mbean is ready to use. We can use Jconsole to use our Mbean, which is shipped with JDK 1.5 and above. In order to use Jconsole to display Jboss Mbeans we have to add some properties on server boot time. Add these followings params to %JBOSS_HOME%/bin/
# Enable the jconsole agent locally
# Tell JBossAS to use the platform MBean server
JAVA_OPTS="$JAVA_OPTS -Djboss.platform.mbeanserver"
# Make the platform MBean server able to work with JBossAS MBeans
see the following link for more information. Now you can run Jconsole and connect to your Jboss application server locally or remotely and set URL or user name, see the image below.
You can also use Jboss JMX Console to edit Mbean properties.
For more information to configure JMX through spring see here. Happy coding)))


Configure Nginx to working with WebLogic 12C

Nginx is a free, open-source, high-performance HTTP server and reverse proxy server, which can be use with WebLogic application server to cache static page. It's also able to load balancing between servers. However nginx default proxy pass configuration not working properly with WebLogic server, because WebLogic server reset his http header which changes host and port. Here is the configuration for Ngnix proxy pass:
        proxy_cache_path usr/apps/nignx/nginx-1.1.12/cache/ levels=1:2 keys_zone=data-cache:8m max_size=1000m inactive=600m;
 proxy_temp_path usr/apps/nignx/nginx-1.1.12/cache/temp; 

 upstream osbapp{
        server {
          listen       8001;

  location / {
   proxy_set_header Host $http_host; # set the parameter for fine granned header
   proxy_set_header X-Real-IP $remote_addr;
   proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
   proxy_store off;
   proxy_redirect off;
   proxy_buffering off;  
   proxy_cache data-cache;
   proxy_cache_valid 200 302 60m;
   proxy_pass  http://osbapp;