Tuesday

Database DML/DDL event processing with Oracle Database change notification

A few years ago in one of my blog post, i described how to use Oracle database changed notification to update HazelCast cache in application server layer. For now this is one of the finest use case of using Oracle database changed notification, but you can also use this possibilities for solving others problem such as event processing in legacy table. For instance, you are developing dispute system for any Banking system. For banking core system, dispute is as a another banking transaction, when any dispute comes from any client, operator of bank should react on this transaction. Most of the time disputes keeps in same storage (tables) along with another transactions. Such type of tables can keeps billions of rows, and when you would like to get notified when a few of the rows changes, you have a few options in your hand:
1) Poll periodically, schedular which will poll the whole table periodically to get changed data.
2) Using oracle trigger to send some notification (stored procedure or oracle embedded java implemention)
3) Using Oracle Database change notification or Oracle continuous Query Notification
if you have a few billions of rows in OLTP system, first option is not a option at all, using trigger can also hit performance issue in OLTP system. For asynchronous event processing from oracle objects, Oracle database change notification is one the best possible option. Oracle provides three different implementing of change notification: Oracle continuous query notification and Oracle database change notification. Oracle continuous notification provides only C and pl/sql implements, in the other hand Oracle changed notification has java implementation. To enable changed notification you should only have to grant the privilege as follows and you are ready to go:
grant change notification to USER_NAME;
One thing you have to mentioned that, with notification you will only get the ROWID and objects name or ID not the entire row, it's means you will have to do extra select query by rowid to get the row. Below illustration of the simple flow of the process:
The explanation of the steps in above illustration is as follows:
1) In this example, client register the lister to certain user Oracle objects and the listener also.
2) The database populate the registration information in the data dictionary
3) Any partner application making any changes in User Objects, it's may be any DML/DDL operations
4) Oracle JOBQ background process is notified of a new change notification message
5) JOBQ process notify the client app listener.
6) Client listener gets the ROWID and the user objects ID and sending the information to the MQ server, such as kafka
7) Subscriber of the topic of Kafka server getting the information of ROWID
8) Processor query the User objects to get the result set and start processing the updated information
Lets take a quick look щи pseudo code (all the code you will find in the git hub)

Registration of the notification:
package com.blu.db;

import oracle.jdbc.OracleStatement;
import oracle.jdbc.dcn.*;
import oracle.jdbc.driver.OracleConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

public class DBNotifactionConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(DBNotifactionConsumer.class);

    private NsiOracleConnection oracleConnection;
    private static final Properties properties = new Properties();
    private String queryString;

    public String getQueryString() {
        return queryString;
    }

    public void setQueryString(String queryString) {
        this.queryString = queryString;
    }

    static{
        properties.setProperty(OracleConnection.DCN_NOTIFY_ROWIDS, "true");
        properties.setProperty(OracleConnection.DCN_QUERY_CHANGE_NOTIFICATION, "true"); //Activates query change notification instead of object change notification.
    }

    public DBNotifactionConsumer(NsiOracleConnection oracleConnection) {
        this.oracleConnection = oracleConnection;
    }

    public NsiOracleConnection getOracleConnection() {
        return oracleConnection;
    }

    public void registerNotification() throws SQLException{
        DatabaseChangeRegistration databaseChangeRegistration =  getOracleConnection().getConnection().registerDatabaseChangeNotification(properties);
        databaseChangeRegistration.addListener(new NsiListner());
        Statement stm = getOracleConnection().getConnection().createStatement();
        ((OracleStatement) stm).setDatabaseChangeRegistration(databaseChangeRegistration);
        ResultSet rs;
        for(String queryString : getQueryString().split(";")){
            rs = stm.executeQuery(queryString);
            while(rs.next()){
            }
            rs.close();
        }
        // get tables from dcr
        String[] tables = databaseChangeRegistration.getTables();
        for(String str : tables){
            LOGGER.info("Registreted Tables:{}", str);
        }
        if(!stm.isClosed()){
            stm.close();
        }
    }
}
In the above code i used query changed notification.
Listener without kafka is very simple
public class NsiListner implements DatabaseChangeListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(NsiListner.class);

    @Override
    public void onDatabaseChangeNotification(DatabaseChangeEvent databaseChangeEvent) {
        for(QueryChangeDescription qcd : databaseChangeEvent.getQueryChangeDescription()){
            LOGGER.info("Query Id: {}", qcd.getQueryId());
            LOGGER.info("Event Type: {}", qcd.getQueryChangeEventType().name());
            for(TableChangeDescription tcd : qcd.getTableChangeDescription()){
                //ClassDescriptor descriptor = OracleChangeNotificationListener.this.descriptorsByTable.get(new DatabaseTable(tcd.getTableName()));
                LOGGER.info("table Name: {}", tcd.getTableName()); // table name is empty
                LOGGER.info("Object ID: {}", tcd.getObjectNumber()); // use object id]]
                for(RowChangeDescription rcd : tcd.getRowChangeDescription()){
                    LOGGER.info("Row ID:" + rcd.getRowid().stringValue() + " Operation:" + rcd.getRowOperation().name());
                }
            }

        }

    }
}
If you want to fault tolerant your listener application, you can use several listener application in a cluster and use leader election to run one listener at time. Here is the pseudo code of the simple leader elector, note that i am using curator to avoid boiler plate code.
package com.blu.curator;

import com.blu.db.DBNotifactionConsumer;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.util.concurrent.atomic.AtomicInteger;

public class Leader extends LeaderSelectorListenerAdapter {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleClient.class);
    private String clientName;
    private CuratorFramework client;
    private String path;
    private LeaderSelector leaderSelector;
    // oracle change notification
    private ApplicationContext ctx;// = new ClassPathXmlApplicationContext("spring-context.xml");
    private DBNotifactionConsumer consumer;//= (DBNotifactionConsumer) ctx.getBean("consumer");

    public Leader(String clientName, CuratorFramework client, String path) {
        this.clientName = clientName;
        this.client = client;
        this.path = path;
        leaderSelector = new LeaderSelector(this.client,this.path, this);
        leaderSelector.autoRequeue();
        // initialize oracle change notification
        ctx = new ClassPathXmlApplicationContext("spring-context.xml");
        consumer = (DBNotifactionConsumer)ctx.getBean("consumer");
    }

    @Override
    public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
        // run oracle notification here

        consumer.registerNotification();
        System.out.println(this.clientName + " is now the leader!!");
        Thread.sleep(Integer.MAX_VALUE);
    }
    public void start(){
        leaderSelector.start();
    }
}
When one of the listener will be not available, another one will replace it and continue to getting notification from Oracle DB. That's enough for today, guess above information will help somebody to quick start with Oracle notification changed.

Sunday

Resilient, are you ready for your application?

UP1: If you are planning to use Reactive programming, we recommend you to read the book "High performance in-memory computing with Apache Ignite".

Nowadays, term Reactive programming and Reactive manifesto is became trend. Blog post, articles and presentations are all over. Why peoples getting crazy with it? or what's the problem it could solve? The easiest way to answer the above questions is to think about the requirements we have building application these days.
We have to:
1) Modular - Module can help to maintain the application, it can be go offline and getting back to online without breaking all the system down;
2) Scalable - this way we can scale vertically or horizontally to handle a large amount of data or transactions;
3) Resilient - system can be getting back online and be fault tolerant;
4) Responsive - this means fast and available.
Here is the paradigm of reactive manifesto:
For most of the part, we already have a few framework such as Akka, Vert.x, Hystrix, ReactiveFx to develop reactive application. Today i am going to highlight only on resilient properties of any application. Probably in your development process, you had to had used any third party libraries or any 3rd party services through network. When these type of 3rd party services or libraries going down or not available, your application suffers in timeout exception or even more crash for high value waiting requests. For example, in our last project we are using 3rd party weather forecast service and define residence of city through ip address in our portal. When these services not available or not responses in time, response time of the portal increases and a few times system crashes because a lot of requests waited in thread pool. These could be happen in any external resources such as database connection, MQ connection e.t.c. If you have to recover from these situation, we have to need short circuit design pattern. Company Netflix a few years ago developed a framework to design such application, which also contain bulkhead patterns. Today i am going to describe the framework Hystrix and will develop a very quick start maven project to show it's capabilities.
Here is the illustration, how it works:
Design is very simple, "When you use Hystrix to wrap each underlying dependency. Each dependency is isolated from one other, restricted in the resources it can saturate when latency occurs, and covered in fallback logic that decides what response to make when any type of failure occurs in the dependency".
To show it's capabilities we developed a simple rest service with two methods (getweather, getcitiybyip), which will play as a 3rd party services.
public interface IServices {
    public String getWeather(String cityName);
    public String getCityByIp(String ipAddress);
}
And we also have a service facade of rest services based on Hystrix, which will delegate the request to the 3rd party services. If the 3rd party service not available, the circuit will open and no request will delegate to the service.
Here is the source code of the Hystrix service facade:
package com.blu.rest;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.core.Response;

/**
 * Created by shamim on 16/07/15.
 * Service facade
 */
// add json
@Path("/servicefacade")
public class HystrixServiceFacade extends HystrixCommand{
    private static final Logger LOGGER = LoggerFactory.getLogger(HystrixServiceFacade.class);
    private static final String CONTEXT_ROOT="http://localhost:8080/test/rest/serviceimpl/";
    private String ipAddress;


    public HystrixServiceFacade() {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("getCityByIp"))
                        .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(100))
                        .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withCircuitBreakerErrorThresholdPercentage(3)
                        .withCircuitBreakerSleepWindowInMilliseconds(2000)
                        )
        );
    }

    @GET
    @Path("/getcitybyip/{param}")
    // http://localhost:8080/test/rest/servicefacade/getcitybyip/192.168.121.11
    public Response echo (@PathParam("param") String ipAddress){
        LOGGER.info("Invoked with Parameter!!!" + ipAddress);
        this.ipAddress = ipAddress;


        String resp = this.execute();
        return Response.status(200).entity(resp).build();
    }

    @Override
    protected String run() throws Exception {
        // invoke 3rd party service
        final String uri = CONTEXT_ROOT + "getcitybyip/" +this.ipAddress;
        Client client = Client.create();
        WebResource webResource = client.resource(uri);
        ClientResponse response = webResource.accept("application/json").get(ClientResponse.class);

        if (response.getStatus() != 200) {
            LOGGER.info("Error:" + response.getStatus());
        }
        return response.getEntity(String.class);


    }

    @Override
    protected String getFallback() {
        return "Fall back for get City by Ip";
    }
    private GetWeatherCommand getWeatherCommand = new GetWeatherCommand();

    @GET
    @Path("/getweather/{param}")
    public Response getWeather(@PathParam("param")String cityName){
        LOGGER.info("Invoked getWeather with param: "+ cityName);
        getWeatherCommand.cityName = cityName;
        String resp = getWeatherCommand.execute();

        return Response.status(200).entity(resp).build();
    }
    class GetWeatherCommand extends HystrixCommand{
        private String cityName;
        public GetWeatherCommand() {
            super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("GetWeather"))
                            .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(100))
                            .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withCircuitBreakerErrorThresholdPercentage(5)
                            .withCircuitBreakerSleepWindowInMilliseconds(2000)
                            )
            );
        }

        @Override
        protected String run() throws Exception {
            // invoke 3rd party service
            final String uri = CONTEXT_ROOT +"getweather/"+this.cityName;
            Client client = Client.create();
            WebResource webResource = client.resource(uri);
            ClientResponse response = webResource.accept("application/json").get(ClientResponse.class);

            if (response.getStatus() != 200) {
                LOGGER.info("Error {}", response.getStatus());
            }
            return response.getEntity(String.class);
        }
        // static fall back
        @Override
        protected String getFallback() {
            return "Fall Back for getWeather";
        }
    };
}
Note that, here is two type of implementation, one when we inherited from Hystrix command, another one when we are using inner Hystrix command. Full source code you will found here in github.com.
Now lets run the simple rest service by the following command:
mvn jetty:run
if everything goes well, your service should be available in url http://localhost:8080/test/rest/serviceimpl/getweather/moscow
Now lets run the hystrix service facade service
mvn jetty:run -Djetty.http.port=9093
Service facade will be available in port 9093 and you can reach it by http://localhost:9093/test/rest/servicefacade/getweather/moscow
Also i have configured HystrixStreamServlet for sending matrices to Hystrix Dashboard.
<servlet>
    <description></description>
    <display-name>HystrixMetricsStreamServlet</display-name>
    <servlet-name>HystrixMetricsStreamServlet</servlet-name>
    <servlet-class>com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsStreamServlet</servlet-class>
</servlet>

<servlet-mapping>
    <servlet-name>HystrixMetricsStreamServlet</servlet-name>
    <url-pattern>/hystrix.stream</url-pattern>
</servlet-mapping>
StreamServlet will available in http://localhost:9093/test/hystrix.stream
Also you can clone the Hystrix dash board from github and follow the instruction to build and run the dashboard. In my case dashboard is available in http://localhost:7979/hystrix-dashboard.
Now we can start sending massive requests to check how it works. For these you can use any Curl command or using my com.blu.reactive.JersyClient class to start sending request. For demonstration purpose of network failure i have add following randomly failure code to the getCityByIp method as follows:
package com.blu.rest;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import java.util.HashMap;
import java.util.Map;

@Path("/serviceimpl")
public class ServicesImpl implements IServices {
    private static final Map WEATHERS= new HashMap(){{
        put("moscow", "29");
        put("petersburg", "21");
        put("berlin", "31");
        put("bercelona", "33");
    }};
    private static final Map CITIES = new HashMap(){{
        put("192.168.121.11","moscow");
        put("192.168.124.21","petersburg");
        put("192.138.111.31","berlin");
        put("192.168.132.61","bercelona");
    }};

    @GET
    @Path("/getweather/{param}")
    @Override
    public String getWeather(@PathParam("param") String cityName) {
        return WEATHERS.get(cityName);
    }
    @GET
    @Path("/getcitybyip/{param}")
    @Override
    public String getCityByIp(@PathParam("param")String ipAddress) {
        // Randomly failure
        final long ctime = System.currentTimeMillis();

        if(ctime % 20 == 0){
            throw new RuntimeException("Randomly failure!!");
        }
        if(ctime % 30 == 0){
            try {
                Thread.sleep(7000);
            } catch (InterruptedException e) {
                // do nothing
            }
        }
        return CITIES.get(ipAddress);
    }
}
After running the JersyClient class, you can observed the Hystrix Dashboard and monitor the services health.
When there is no service failure, circuit is closed and delegated to the 3rd party service as follows:
When randomly failure occurs or you stop the 3rd party service, Hystrix discovered the network failure and circuit is open, then no service requests delegates to the 3rd party services and hystrix service facade returns the fallback. After 2 second (it configurable by withCircuitBreakerSleepWindowInMilliseconds(2000)) hystrix passes a few requests and if the service keep alive it continue to passed all the request. Here is the illustration when circuit is open:

Certainly you can control how much service failure will cause circuit open, in our case it's 5% of all request. In Hystrix framework there are a few many interesting capabilities such as bulkhead, timeout control, but for today it's enough. Happy Weekend.

Microservices - tools and implementations

UP1: If you are planning to migrate to Microservice, we recommend you to read the book "High performance in-memory computing with Apache Ignite".

One of the challenging thing in Microservices world is not the implementation of the services, rather it's monitoring and the management. In the time of writing the blog, we have a few frameworks and tools to implements microservices such as Dropwizard, Spring boot and vertx. Complexity grows when you have a lot of independent micro services deployed and running over cloud infrastructure. It really a pain full task to monitor and manage all the services through 24*7. However Ansible, puppet, docker and logstash tools can help you to build such a management platform for micro services but they are not always sufficient. To solve the above described problem Jboss project release such a management tool for micro services named fabric8. Fabric8 provides following possibilities such as management, continuous delivery and simple integration platform based on Apache Camel project. This open source project based on google kubernetes and openshiftV3. Every application deployed in fabric8 is independent and running in separate java container, which you can stop, start or restart. Any moment you can increase or decrease the number of instance of any application in fabric8 with a single mouse click. In this blog i am going to quick start with fabric8, install and run in a single machine and will try to deploy one simple application from it's quick start example.
For installation of fabric8 you can follow the getting started guide, i have choose the Vagrant way. If you already familiar with Vagrant and docker then installation process will be easy for you. If everything goes fine with your installation you should have the following page.
Now you can play with some of the pre installed application, for example quickstart-rest. If any of them is not running, you can check application logs. If you will got following errors in web console or in any application log:
index.docker.io: no such host
add the following name server in you /etc/resolv.conf file and restart the docker instance
nameserver 8.8.8.8
nameserver 8.8.4.4
sudo systemctl restart docker
Now we can clone the quick start project from the git hub and try to deploy any of the example.
The example is well documented but for the first time it may not works.
Build the docker image
mvn package docker:build
if you will get error something like these
index.docker.io: no such host, you should add the following IP address in you /etc/hosts host machine.
52.0.31.125 index.docker.io
52.0.31.125 registry-1.docker.io
If you are using vagrant image in an single machine, do not need to push the image in the docker hub, rather than you can apply kubernetes configuration to the installed platform.
mvn fabric8:apply
Above command will deploy the given application with meta data to local fabric8.
Now you can increase the instance of the application by clicking the pod column and try to invoke the servlet by the following URL:
http://quickstart-camelservlet.vagrant.f8/camel/hello?name=shamim
the result should be as follows:
From here you can develop your application with maven archetypes provides by fabric8 team and enjoy your micro services.