Wednesday, December 19, 2012

Real time data processing with Storm, ETL from Oracle to Cassandra

Last couple of months we are using Apache Hadoop Map Reduce batch processing to analyze a huge amount of data. We have a few legacy product where we can't consider to using Cassandra big table database. A few of them uses Oracle Database as their primary storage. As our requirements we have to extract the data from the rdbms, parse the payload and load it to Cassandra for aggregating. Here i have decided to use Storm for real time data processing. Our usecase is as follows:
1) Storm spout connects to Oracle Database and collects data from particular table with some intervals.
2) Storm bolt parses the data with some fashion and emit to Storm-cassandra bolt to store the row into Cassandra DB.

Here is the fragment code of project. First i have create a Jdbc connector class, class contain a few class variables which contradicting with Storm ideology, as far i have just need one spout as input - it's enough for me.
package storm.contrib.jdbc;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileInputStream;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;


public class JdbcConnection {
    private static Connection connection;
    private static final String JDBC_DRIVER = "oracle.jdbc.driver.OracleDriver";

    private JdbcConnection() {   }
    private static String jdbcUrl;
    private static String userName;
    private static String password;
    private static String query;
    private static String interval;
    private static final Logger logger = LoggerFactory.getLogger(JdbcConnection.class);

    static{
        Properties prop = new Properties();
        try {
            prop.load(JdbcConnection.class.getResourceAsStream("/connection.properties"));
            jdbcUrl = prop.getProperty("jdbc.url");
            userName = prop.getProperty("jdbc.username");
            password = prop.getProperty("jdbc.password");
            query = prop.getProperty("jdbc.query");
            interval = prop.getProperty("poll.interval");
            
        } catch (IOException e) {
            logger.error(e.getMessage());
        }
        
    }
    
    public static Connection getConnection() {
        if(connection != null ){
            return connection;
        }
        try {
            Class.forName(JDBC_DRIVER);
            connection = DriverManager.getConnection(getJdbcUrl(),getUserName(),getPassword());
            connection.setAutoCommit(false);
        } catch (ClassNotFoundException e) {
            // throw the exception
            logger.error(e.getMessage());
        } catch(SQLException e){
            logger.error(e.getMessage());
        }
        return connection;
    }

    public static String getJdbcUrl() {
        return jdbcUrl;
    }

    public static String getUserName() {
        return userName;
    }

    public static String getPassword() {
        return password;
    }

    public static String getQuery() {
        return query;
    }

    public static String getInterval() {
        return interval;
    }
}
Class JdbcConnection reads connection.properties file from the classpath and initialize the connection.
Now we are ready to create our Oracle Spout
package storm.contrib.spout;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;

import backtype.storm.utils.Utils;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.contrib.jdbc.JdbcConnection;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.*;
import java.util.*;

import static backtype.storm.utils.Utils.tuple;

public class OracleSpout implements IRichSpout{
    private SpoutOutputCollector collector;
    private TopologyContext context;
    private transient Connection connection;
    private boolean completed =false;
    private String query;
    private long interval;
    private static final Logger logger = LoggerFactory.getLogger(OracleSpout.class);
    private Fields outputFields;
    public OracleSpout(final Fields outputFields){
        this.outputFields = outputFields;
    }
    public boolean isDistributed() {
        return false;
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(this.outputFields);
    }

    public Map<String, Object> getComponentConfiguration() {

        return null;
    }
    // open connection to Oracle DB
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.collector = spoutOutputCollector;
        this.context = topologyContext;
        // connect to DB
        connection = JdbcConnection.getConnection();
        query = JdbcConnection.getQuery();
        interval = Long.valueOf(JdbcConnection.getInterval());
    }
    public void close() {
        if(connection != null){
            try {
                connection.close();
            } catch (SQLException e) {
                logger.error(e.getMessage());
            }
        }
    }

    public void activate() {
    }

    public void deactivate() {
    }
    // read data and send to bolt
    public void nextTuple() {
        if(completed){
            Utils.sleep(interval);
        }
        Statement stm = null;
        if(connection != null){
            List<Object> tupleVal = new ArrayList<Object>();
            try {
                //stm = connection.createStatement();
                stm = connection.prepareStatement(query,ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE);
                ResultSet rs = stm.executeQuery(query);
                //ResultSetMetaData rsmd = rs.getMetaData();
                while(rs.next()){
                    ResultSetMetaData rsmd = rs.getMetaData();

                    for(int col=1; col <= rsmd.getColumnCount(); col++){
                        tupleVal.add(getDataByCol(rs, rsmd.getColumnType(col), col));
                    }
                    getCollector().emit(tuple(tupleVal.toArray()));
                    // delete the row
                    rs.deleteRow();
                    completed = true;
                }
            } catch (SQLException e) {
                logger.error(e.getMessage());
            }finally {
                tupleVal.clear();
                if(stm!=null)
                    try {
                        stm.close();
                    } catch (SQLException e) {
                        logger.error(e.getMessage());
                    }
            }
        }
    }

    public void ack(Object o)   {
        // @todo delete the record
        //logger.info("ack:", o);
    }

    public void fail(Object o) {
        logger.info("fail:", o);
    }
    private SpoutOutputCollector getCollector() {
        return collector;
    }

    private Object getDataByCol(ResultSet rs, int colType, int colIdx) throws SQLException{
        Object colData;
        switch (colType){
            case Types.CHAR:
            case Types.VARCHAR:
            case Types.CLOB:
                colData = rs.getString(colIdx);
                break;
            case Types.INTEGER:
            case Types.BIGINT:
            //case Types.:
                colData = rs.getLong(colIdx);
                break;
            case Types.DECIMAL:
            case Types.NUMERIC:
                colData = rs.getDouble(colIdx);
                break;
            case Types.DATE:
                colData = rs.getDate(colIdx);
                break;
            case Types.TIMESTAMP:
                colData = rs.getTimestamp(colIdx);
                break;
            case Types.BLOB:
                Blob blob = rs.getBlob(colIdx);
                InputStream is =  blob.getBinaryStream();
                colData = getBytes(is);
                //colData = rs.getBlob(colIdx);
                break;
            default:
                colData = rs.getString(colIdx);
                break;
        }
        return colData;
    }
    private byte[] getBytes(InputStream is){
        // Get the size of the file
        try {
            return IOUtils.toByteArray(is);
        } catch (IOException e) {
            e.printStackTrace(); 
            return new byte[0];
        }
    }
}
The class is self explanatory itself. In open method we initialized the jdbc connection, in the nextTuple method we query the table and emit tuple for the parse bolt.
package storm.contrib.bolt;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.contrib.parse.ParseMessage;

import javax.xml.stream.XMLStreamException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static backtype.storm.utils.Utils.tuple;

public class ParseMsgBolt implements IRichBolt {
    private OutputCollector collector;
    private static final Logger logger = LoggerFactory.getLogger(ParseMsgBolt.class);
    TopologyContext context;

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        this.context = topologyContext;
    }

    public void execute(Tuple tuple) {
        // parse
        Map<String, String> msgTuple = null;
        if(logMsg.length > 0){
            ParseSmevMessage3  parseMessage = new ParseSmevMessage3();
            List ls = new ArrayList();
            try {
                msgTuple =  parseMessage.parse(new String(logMsg));
                // add logid first;
                ls.add(logId);
                // add parsed fields values
                ls.addAll(msgTuple.values());
                this.collector.emit(ls);
                msgTuple.clear();
            } catch (XMLStreamException e) {
                // send failure msg
                collector.fail(tuple);
                logger.error(e.getMessage());
            }
        }
        //send ack
        this.collector.ack(tuple);
    }

    public void cleanup() {

    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("logid",
                                                "exct",
                                                "asen",
                                                "origc",
                                                "ard",
                                                "arc",
                                                "tcode",
                                                "asenc",
                                                "orign",
                                                "certsn",
                                                "arcn"));
                                                "request_id_ref",
                                                "origin_request_id_reg",
                                                "case_number",
                                                "cert",
                                                "messageid",
                                                "srv_sid",
                                                "test_msg",
                                                "status",
                                                "exchange_type"));
    }
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

package ru.atc.smev.pig.utils;

import com.ximpleware.*;

import java.io.ByteArrayInputStream;
import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class ParseSmevMessage3 {
    private final static String CERT_TYPE = "X509";
    private static final String CERTSN = "certsn";
    private static final String ROOT_TAG = "root_tag";

    private static Map<String, String> SEARCH_TAGS = new HashMap<String, String>(17);
    static{
        SEARCH_TAGS.put("/*[local-name() = 'Envelope']/*[local-name() = 'Header']/*[local-name() = 'Security']/*[local-name() = 'BinarySecurityToken']/text()", CERTSN);
        //SEARCH_TAGS.put("/Envelope/root_tag", ROOT_TAG);
        SEARCH_TAGS.put("/*[local-name() = 'Envelope']/*[local-name() = 'Header']/*[local-name() = 'Header']/*[local-name() = 'MessageId']/text()", "msgid");

        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'Sender']/*[local-name() = 'Code']/text()", "asenc");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'Sender']/*[local-name() = 'Name']/text()", "asen");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'Recipient']/*[local-name() = 'Code']/text()", "arc");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'Recipient']/*[local-name() = 'Name']/text()", "arcn");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'Originator']/*[local-name() = 'Code']/text()", "origc");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'Originator']/*[local-name() = 'Name']/text()", "orign");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'Date']/text()", "ard");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'TypeCode']/text()", "tcode");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'ExchangeType']/text()", "exct");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'ServiceCode']/text()", "serc");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'RequestIdRef']/text()", "reqidr");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'RequestIdRef']/text()", "reqidr");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'OriginRequestIdRef']/text()", "origridr");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'CaseNumber']/text()", "can");

    }
    private String getSerialNumber(final String cert) throws CertificateException {
        byte[] derFile = org.bouncycastle.util.encoders.Base64.decode(cert.getBytes());

        final CertificateFactory cf = CertificateFactory.getInstance(CERT_TYPE);
        final X509Certificate x509 = (X509Certificate) cf.generateCertificate(new ByteArrayInputStream(derFile));
        // get serial number in hex
        return x509.getSerialNumber().toString(10);
    }
    public Map<String, String> parse (String xml) throws Exception{
        final Map<String, String> result = new HashMap<String, String>(17);
        final VTDGen vg = new VTDGen();
        final AutoPilot ap = new AutoPilot();
        int i;
        try {
            vg.setDoc(xml.getBytes());
            vg.parse(true);
            VTDNav vn = vg.getNav();
            ap.bind(vn);
            for(String key : SEARCH_TAGS.keySet()){
                ap.selectXPath(key);
                while( (i = ap.evalXPath())!=-1 ){
                    //System.out.println(SEARCH_TAGS.get(key)+":"+ vn.toString(i));
                    result.put(SEARCH_TAGS.get(key),vn.toString(i));
                    if(vn.matchElement("wsse:BinarySecurityToken")){
                        result.put(CERTSN, getSerialNumber(vn.toString(i)));
                    }
                }
                ap.resetXPath();
            }
        } catch (XPathParseException e) {
            e.printStackTrace();
        } catch(XPathEvalException e){
            e.printStackTrace();            
        } catch(NavException e){
            e.printStackTrace();
        }
        return result;
    }
}
I have used vtd xml library to parse the file and emit it to cassandra bolt, which store the row to the Cassandra DB.
Here is the topology class:
package storm.contrib.topology;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.contrib.cassandra.bolt.BatchingCassandraBolt;
import backtype.storm.contrib.cassandra.bolt.CassandraBolt;
import backtype.storm.contrib.cassandra.bolt.DefaultBatchingCassandraBolt;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import storm.contrib.bolt.ParseMsgBolt;
import storm.contrib.spout.OracleSpout;

public class CassandraBoltTopology {
    public static void main(String[] args) {

        TopologyBuilder tBuilder = new TopologyBuilder();
        // define oracle table column name 
        tBuilder.setSpout("oracle-reader", new OracleSpout(new Fields("logobject_id",
                                                        "oper_name",
                                                        "sender_code",
                                                        "sender_name",
                                                        "recipient_code",
                                                        "recipient_name",
                                                        "originator_code",
                                                        "originator_name",
                                                        "request_date",
                                                        "type_code",
                                                        "service_code",
                                                        "request_id_ref",
                                                        "origin_request_id_reg",
                                                        "case_number",
                                                        "cert",
                                                        "messageid",
                                                        "srv_sid",
                                                        "test_msg",
                                                        "status",
                                                        "exchange_type")));
        tBuilder.setBolt("Msgparser", new ParseMsgBolt()).shuffleGrouping("oracle-reader");
        tBuilder.setBolt("save to cassandra", new CassandraBolt("stormcf", "logid")).shuffleGrouping("Msgparser");
  
        Config config = new Config();
        config.put(CassandraBolt.CASSANDRA_HOST, "192.168.XXX.XXX");
        config.put(CassandraBolt.CASSANDRA_PORT, 9160);
        config.put(CassandraBolt.CASSANDRA_KEYSPACE, "stormks");
        config.setDebug(true);
        
  //Topology run
        config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("Getting-Started-Toplogie", config, tBuilder.createTopology());
    }
}
Here we first declare and define the column names of the table where we would collect the data. In the current storm version we could not define out put field dynamically. Next we set the bolt accordingly and configure the CassandraBolt. Project runs well on two machine cluster with any errors. Anyway i have a plan to add transaction feature in spout. Hope this will help someone to kick start in Storm. Here is the properties file:
jdbc.url=jdbc:oracle:thin:@192.168.1XX.XX:1521:TEST
jdbc.username=orawsm
jdbc.password=orawsm
jdbc.query=SELECT t.* FROM LOG_OBJECTS t

poll.interval=10000

3 comments:

Sergio Moral said...

Hi,

really interesting article. Looking forward to reading more of your work with Cassandra and Storm.

Sergio Moral said...

Hi,

really interesting article. Looking forward to reading more of your work with Storm and Cassandra.

Harish said...

Hi Shamim,


I have the similar requirement in my project. i gone threw your code,its very nice.
But You missed to update the 2nd BOLT (Cassendrabolt) in this page .Please provide code how you are taking data from 1st bolt and updating it to cassendra db.

your reply is appreciated.