Friday, May 18, 2012

Another message redelivery pattern with WSO2 ESB

In the articles Implementing Store and Forward Messaging Patterns With WSO2ESB Part 1 and Implementing Store and Forward Messaging Patterns With WSO2ESB Part 2 . I have introduced WSO2 Message Processors and Message Stores and explained how to implement store and forward messaging patterns with them.

In this blog post i ' m trying to show how to implement a redelivery messaging pattern i recently implemented. actually in the article Implementing Store and Forward Messaging Patterns With WSO2ESB Part 2  i have described how to implement  a redelivery pattern with Message Forwarding processor. But in this post i'm going to show how to implement another redelivery pattern which can’t be achieved with Message Forwarding processor.

Following is the requirement :

  1. Client invokes a Backend service through a ESB.
  2.  If 1st invocation is successful send the response back to client and if service is not available send a fault message back.
  3.  In a failure re try to deliver the message N number of times where each re-try happens after a T interval
  4. After N failed attempts drop the message (Or store it somewhere)

Lets see how we can implement this using WSO2 ESB.

For this i’ll be using WSO2 ESB mediation sequences , Message Stores and Message Processors. If we look at how WSO2 Mediation constructs are designed. They are designed in a matter that that they act as set of mediation building blocks using which we can compose different EIP/Messaging patterns. So in this scenario we will be composing sequences , mediators , Message Stores and Processors to build this messaging pattern.

Message Flow : 

Success Scenario :
  1. Proxy Service Accepts the Message Send Message to SendToService Sequence
  2. SendToService sequence send message to the service
  3. Message response comes to proxy services out sequence using which we send back the response to the client


(Note : In this case SendFault sequence is registered as the fault handler for SendToService sequence . Which means in case if there BE service is not available that message will go to the SendFault sequence)

When service is not available :
  1. Proxy service accepts the Message and Send message to SendToServiceSeqeunce.
  2. SendToServer sequence send message to the service
  3. Since endpoint is down SendFault sequence get executed with the failed message
  4. We check for a property in a message named : retry_count , If it is not there :  It means this is the 1st failure. in that case we go to step 5. if retry_count is there go to step 6
  5. Clone the message and in one clone thread we create a fault message and send back to the client. In the other clone thread we add property: retry_count to 1 and store the message.
  6. We check if the retry_count > N if that is the case we drop the message if not increment retry_count using a xpath function and store the message. 

Note  : Here MessageSampling Processor is configured to poll messages from Message Store in every T time interval and send it to SendToService sequence.

Following is the Flow chart of the above described Message Flow.







And Below is a Sample ESB Configuration XML which implements this. 

<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://ws.apache.org/ns/synapse">
    <registry provider="org.wso2.carbon.mediation.registry.WSO2Registry">
        <parameter name="cachableDuration">15000</parameter>
    </registry>
    <proxy name="InOnlyProxy" transports="https http" startOnLoad="true" trace="disable">
        <target>
            <inSequence>
                <property name="target.endpoint" value="SimpleStockQuoteService"/>
                <sequence key="SimpleSQSender"/>
            </inSequence>
            <outSequence>
                <send/>
            </outSequence>
        </target>
    </proxy>
    <endpoint name="SimpleStockQuoteService">
        <address uri="http://127.0.0.1:9000/services/SimpleStockQuoteService">
            <suspendOnFailure>
                <errorCodes>-1</errorCodes>
                <progressionFactor>1.0</progressionFactor>
            </suspendOnFailure>
            <markForSuspension>
                <errorCodes>-1</errorCodes>
            </markForSuspension>
        </address>
    </endpoint>
    <sequence name="SimpleSQFault">
        <log level="custom">
            <property xmlns:soapenv="http://www.w3.org/2003/05/soap-envelope" xmlns:ns="http://org.apache.synapse/xsd" xmlns:ns3="http://org.apache.synapse/xsd" name="retry_count" expression="get-property('retry_count')"/>
        </log>
        <filter xmlns:soapenv="http://www.w3.org/2003/05/soap-envelope" xmlns:ns="http://org.apache.synapse/xsd" xmlns:ns3="http://org.apache.synapse/xsd" xpath="get-property('retry_count')">
            <then>
                <property name="retry_count" expression="number(get-property('retry_count'))+1" scope="default"/>
                <filter xpath="get-property('retry_count') > 10">
                    <then>
                        <log>
                            <property name="Dropping--Count" expression="get-property('retry_count')"/>
                        </log>
                        <drop/>
                    </then>
                </filter>
            </then>
            <else>
                <property name="retry_count" value="1" scope="default"/>
                <clone continueParent="true">
                    <target>
                        <sequence>
                            <makefault version="soap11">
                                <code xmlns:tns="http://www.w3.org/2003/05/soap-envelope" value="tns:Receiver"/>
                                <reason expression="get-property('ERROR_MESSAGE')"/>
                            </makefault>
                            <send/>
                            <drop/>
                        </sequence>
                    </target>
                </clone>
            </else>
        </filter>
        <clone>
            <target>
                <sequence>
                    <store messageStore="SimpleMS"/>
                </sequence>
            </target>
        </clone>
    </sequence>
    <sequence name="SimpleSQSender" onError="SimpleSQFault">
        <send>
            <endpoint key="SimpleStockQuoteService"/>
        </send>
    </sequence>
    <sequence name="fault">
        <log level="full">
            <property name="MESSAGE" value="Executing default 'fault' sequence"/>
            <property name="ERROR_CODE" expression="get-property('ERROR_CODE')"/>
            <property name="ERROR_MESSAGE" expression="get-property('ERROR_MESSAGE')"/>
        </log>
        <drop/>
    </sequence>
    <sequence name="main">
        <in>
            <log level="full"/>
            <filter source="get-property('To')" regex="http://localhost:9000.*">
                <send/>
            </filter>
        </in>
        <out>
            <send/>
        </out>
        <description>The main sequence for the message mediation</description>
    </sequence>
    <messageStore name="SimpleMS"/>
    <messageProcessor class="org.apache.synapse.message.processors.sampler.SamplingProcessor" name="Processor" messageStore="SimpleMS">
        <parameter name="interval">3000</parameter>
        <parameter name="sequence">SimpleSQSender</parameter>
    </messageProcessor>
</definitions>

For this i have used the SimpleStockQuoteService as the Backend service which is comming with WSO2 ESB Samples. In this configuration it drop the message after N retries but we can use store mediator and store it in another message store in need.


Monday, April 9, 2012

Distributed Coordination with Apache Zookeeper - A Java Approch

Apache Zookeeper is an Open source framework which can be used for distributed coordination. To understand what Apache zookeeper does we will have to go back to the Concurrent Programming class we did at university. Think how we tried to do coordination among different Threads or processors We used global variables most of the times.

Actually there are two ways to achieve coordination among processors

1) Message Passing
2) Shared Memory approach.


Now lets bring that problem to the next level. How to coordinate threads /processors that are running in different machines/nodes in a distributed system. The above two approaches are still the solution for this level as well. (Yes its a pattern in science :) )

There are lot of messaging frameworks and distributed shared memory abstractions available which can be used to achieve distributed coordination.

I'd like to think Apache zookeeper as a Coordination framework which gives a shared memory abstraction to the distributed processors by giving a global file system like data structure and operations to manage it. And also it will provide additional set of features that will ease the developers life who will write coordination algorithms like being able to subscribe for node changes etc...

Anyway bottom line is its just a global data structure that all processors/nodes will see. No MAGIC!!! You will have to write distributed coordination algorithms by yourself if you need to do coordination.

I was working on an effort on implementing Distributed Message Broker in which worked on creating some distributed coordination algorithms

We were using Apache zookeeper to handle some of the coordination tasks. Most of the coordination algorithms are specific to the problem domain we were working on like queue worker distribution balancing the load , handling node failures , node_id coordination etc.. But in this blog i’ll provide an example with a java code on how to implement a distributed resource lock using zookeeper. Which is one of the simplest and common use cases. Which i think is a good starting point for Java programmers who wants to start working with Zookeeper.

This is a generic problem and there is a nice recipe provided by zookeeper http://zookeeper.apache.org/doc/trunk/recipes.html. But it may be hard to understand in the first read. Even i had to explain it to few people on how it works. So i’ll explain the algorithm first and then shoot the code.

Following are the steps of the algorithm http://zookeeper.apache.org/doc/trunk/recipes.html#sc_recipes_Locks)

  1. Call create( ) with a pathname of "_locknode_/guid-lock-" and the sequence and ephemeral flags set. The guid is needed in case the create() result is missed. See the note below.
  2. Call getChildren( ) on the lock node without setting the watch flag (this is important to avoid the herd effect).
  3. If the pathname created in step 1 has the lowest sequence number suffix, the client has the lock and the client exits the protocol.
  4. The client calls exists( ) with the watch flag set on the path in the lock directory with the next lowest sequence number.
  5. if exists( ) returns false, go to step 2. Otherwise, wait for a notification for the pathname from the previous step before going to step 2.

Explaining more about the steps in the step one We create a node with name like *guid*-lock-0001 where guild is a unique id generated by the client which can be used by the client to later identify the node it created. The number at the end is the sequence number appended by zookeeper since we set the flag sequential . And also one thing to note is we have also set the ephemeral flag when creating the node. So that means this node will automatically get deleted once the client connection which created that node terminates.

In the steps 2 and 3 what we do is we get the all the lock nodes. and if our node is the node with the lowest sequence number(we can use guid to determine which node is created by us) that means we have the lock. So we can do our job and exit (Note that when we exit , our connection to zookeeper terminates and it will delete the node created by our client)

If our node is not the one with lowest sequence number our client will reach the 4th step in the algorithm. Which means they do not have the lock and they must wait till they acquire the lock.

So in the 4th step what we do is we do a exist() call on the node with the next lowest sequence number (to current node, not the 2nd lowest number in the node set ) and register a watch for that. Idea is to avoid one node being busy handling watches. So after that each client will be watching only one node and one node is watched by only one client.

See the image bellow. Here N1.. N4 are nodes created by clients and here N1 holds the lock.




So once the N1 release the lock (Since the client connection terminates ) that N1 node will be deleted which will be seen by N2 and since it rerun the algo from step2 it will be selected as the lock holder.

Here someone will ask a fair question , what if a client that created N3 dies ? That case also handled by this algorithm too. Say N3 Client dies so in that case N4 Will be notified and it will go to step 2 in the algo which will get All the children and relaise its not the leader and then it will again set a watch to N2 which the the next lowest node to it .

The below picture shows what happen in this scenario.




after N4 run the algo from Step2 it will look like this.




So having this understanding let’s go into implementation details. So 1st thing we need to have is a API design for the Lock.

In this case i’ll implement this as a Class Named ResourceLock which will take Resource Name as a constructor argument and have 3 other methods

  1. acquire - acquire the lock the given resource. If lock is not available wait till lock acquired
  2. release - release the acquired lock
  3. destroy - clean up resource like connections etc..


Following is a Sample Java implementation of this Lock I wrote. In this case i wanted to make this lock a reusable one within a application across multiple threads instead of creating and exiting zookeeper connection each time i reused it. So when a client release lock it will delete that node from Zookeeper instead of terminating the connection. I used Java Semaphores to enforce the behavior that is given to the outside world by the API as described above

Resource Lock Class :

package org.wickramarachchi.blog;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.wso2.zookeeper.sample.leader.coordination.CoordinationException;

import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Semaphore;

public class ResourceLock {

private final Object lock = new Object();

private String myNode;
private String myZNode = null;
private int myId;

private ZooKeeper zooKeeper = null;

private String address;
private int port;
private String resource;

public static String RESOURCE_LOCK_PARENT ="/resource_lock_parent";

public static String NODE_SEPARATOR = "/";


public static String RESOURCE_LOCK_NODE = "/resource_lock_node";

/**
* Creates a Distributed Lock for a resource
*
* @param address zookeeper instance host name
* @param port zookeeper instance port
* @param resource resource name
*/
public ResourceLock(String address, int port, String resource) {

this.address = address;
this.port = port;
this.resource = resource;

}


/**
* Acquire the lock for the resource. This will get blocked till the it get
* the lock for the resource
*
*/
public void acquire() throws InterruptedException, CoordinationException {
try {

if (zooKeeper == null) {
synchronized (lock) {
if (zooKeeper == null) {
System.out.println("Starting Zookeeper agent for host : " + address + " port : " + port);
zooKeeper = new ZooKeeper(address, port, null);
System.out.println("ZooKeeper agent started successfully and connected to " + address + ":"
+ port);
try {
if (zooKeeper.exists(RESOURCE_LOCK_PARENT + "_" + resource,
false) == null) {
zooKeeper.create(RESOURCE_LOCK_PARENT + "_" + resource,
new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

} catch (Exception e) {
String msg = "Error while creating Queue worker coordination parent at " +
RESOURCE_LOCK_PARENT + "_" + resource;
e.printStackTrace();
throw new CoordinationException(msg, e);
}
}
}
}
createNode();
proceed();


} catch (Exception e) {
throw new CoordinationException("Error Acquiring Lock ", e);
}


}


/**
* Release the Lock for the resource
*
* @throws CoordinationException
*/
public void release() throws CoordinationException {

try {
deleteNode();
} catch (Exception e) {
throw new CoordinationException("Error while releasing lock", e);
}

}


private void createNode() throws InterruptedException, KeeperException {


final String nodeName = RESOURCE_LOCK_NODE +
(UUID.randomUUID()).toString().replace("-", "_");
this.myNode = nodeName.replace("/", "");
String path = RESOURCE_LOCK_PARENT
+ "_" + resource + nodeName;
zooKeeper.create(path, new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

}


private void deleteNode() throws InterruptedException, KeeperException {
if (zooKeeper != null) {
String path = RESOURCE_LOCK_PARENT + "_" + resource +
NODE_SEPARATOR + myZNode;
zooKeeper.delete(path, -1);
}
}


private List<String> getChildren() throws InterruptedException, KeeperException {

return zooKeeper.getChildren(RESOURCE_LOCK_PARENT + "_" + resource, false);
}


private boolean proceed() throws InterruptedException, KeeperException {

while (true) {
final Semaphore lock = new Semaphore(1);
lock.acquire();
List<String> childNodes = getChildren();
HashMap<Integer, String> nodeIdMap = new HashMap<Integer, String>();

String selectedNode = null;
int currentMin = Integer.MAX_VALUE;
for (String child : childNodes) {
String id = child.substring(myNode.length());
int seqNumber = Integer.parseInt(id);
if (child.contains(myNode)) {
myId = seqNumber;
myZNode = child;
}
nodeIdMap.put(seqNumber, child);
if (seqNumber < currentMin) {
selectedNode = child;
currentMin = seqNumber;
}
}

assert selectedNode != null;
if (selectedNode.contains(myNode)) {
System.out.println("Lock acquired..");
break;
} else {
int myLockHolder = --myId;
Stat stat = zooKeeper.exists(RESOURCE_LOCK_PARENT +
"_" + resource + NODE_SEPARATOR +
nodeIdMap.get(myLockHolder),
new Watcher() {

@Override
public void process(WatchedEvent watchedEvent) {
if (Event.EventType.NodeDeleted == watchedEvent.getType()) {
System.out.println("Locked Release Detected.. Trying to acquire lock again..");
lock.release();

}
}
});

if (stat == null) {
System.out.println("Locked Release Detected.. Trying to acquire lock again..");
continue;
}

lock.acquire();
}


}

return true;
}

/**
* Cleanup allocated Zookeeper Resources for this Lock
*/
public void destroy() throws CoordinationException {

try {
zooKeeper.close();
} catch (InterruptedException e) {
throw new CoordinationException("Error while releasing the Queue Lock ", e);
} finally {
zooKeeper = null;
}
}

}




Test Class to Test this lock. In this case Clients from different threads try to access the same resource but Lock Make sure that one process have the resource at a given time.

package org.wickramarachchi.blog;



public class LockTest {


public static void main(String[] args) throws Exception {
new LockTest().run();
}


public void run() throws Exception{
Thread t1 = new Thread(new Process(1));
Thread t2 = new Thread(new Process(2));
Thread t3 = new Thread(new Process(3));
Thread t4 = new Thread(new Process(4));

t1.start();
t2.start();
t3.start();
t4.start();
}


class Process implements Runnable{


int id;

public Process(int id ) {
this.id = id;
}
@Override
public void run() {
try {
String resource= "resource";
ResourceLock lock = new ResourceLock("127.0.0.1", 2181,resource);


System.out.println(id + " Acquiring Lock" );
lock.acquire();
System.out.println(id + "get Lock" );

Thread.sleep(500);

System.out.println(id + " Releasing Lock" );
lock.release();
System.out.println(id + " Released Lock" );
lock.destroy();

} catch (Exception e) {
e.printStackTrace();
}
}
}
}