Monday, April 9, 2012

Distributed Coordination with Apache Zookeeper - A Java Approch

Apache Zookeeper is an Open source framework which can be used for distributed coordination. To understand what Apache zookeeper does we will have to go back to the Concurrent Programming class we did at university. Think how we tried to do coordination among different Threads or processors We used global variables most of the times.

Actually there are two ways to achieve coordination among processors

1) Message Passing
2) Shared Memory approach.


Now lets bring that problem to the next level. How to coordinate threads /processors that are running in different machines/nodes in a distributed system. The above two approaches are still the solution for this level as well. (Yes its a pattern in science :) )

There are lot of messaging frameworks and distributed shared memory abstractions available which can be used to achieve distributed coordination.

I'd like to think Apache zookeeper as a Coordination framework which gives a shared memory abstraction to the distributed processors by giving a global file system like data structure and operations to manage it. And also it will provide additional set of features that will ease the developers life who will write coordination algorithms like being able to subscribe for node changes etc...

Anyway bottom line is its just a global data structure that all processors/nodes will see. No MAGIC!!! You will have to write distributed coordination algorithms by yourself if you need to do coordination.

I was working on an effort on implementing Distributed Message Broker in which worked on creating some distributed coordination algorithms

We were using Apache zookeeper to handle some of the coordination tasks. Most of the coordination algorithms are specific to the problem domain we were working on like queue worker distribution balancing the load , handling node failures , node_id coordination etc.. But in this blog i’ll provide an example with a java code on how to implement a distributed resource lock using zookeeper. Which is one of the simplest and common use cases. Which i think is a good starting point for Java programmers who wants to start working with Zookeeper.

This is a generic problem and there is a nice recipe provided by zookeeper http://zookeeper.apache.org/doc/trunk/recipes.html. But it may be hard to understand in the first read. Even i had to explain it to few people on how it works. So i’ll explain the algorithm first and then shoot the code.

Following are the steps of the algorithm http://zookeeper.apache.org/doc/trunk/recipes.html#sc_recipes_Locks)

  1. Call create( ) with a pathname of "_locknode_/guid-lock-" and the sequence and ephemeral flags set. The guid is needed in case the create() result is missed. See the note below.
  2. Call getChildren( ) on the lock node without setting the watch flag (this is important to avoid the herd effect).
  3. If the pathname created in step 1 has the lowest sequence number suffix, the client has the lock and the client exits the protocol.
  4. The client calls exists( ) with the watch flag set on the path in the lock directory with the next lowest sequence number.
  5. if exists( ) returns false, go to step 2. Otherwise, wait for a notification for the pathname from the previous step before going to step 2.

Explaining more about the steps in the step one We create a node with name like *guid*-lock-0001 where guild is a unique id generated by the client which can be used by the client to later identify the node it created. The number at the end is the sequence number appended by zookeeper since we set the flag sequential . And also one thing to note is we have also set the ephemeral flag when creating the node. So that means this node will automatically get deleted once the client connection which created that node terminates.

In the steps 2 and 3 what we do is we get the all the lock nodes. and if our node is the node with the lowest sequence number(we can use guid to determine which node is created by us) that means we have the lock. So we can do our job and exit (Note that when we exit , our connection to zookeeper terminates and it will delete the node created by our client)

If our node is not the one with lowest sequence number our client will reach the 4th step in the algorithm. Which means they do not have the lock and they must wait till they acquire the lock.

So in the 4th step what we do is we do a exist() call on the node with the next lowest sequence number (to current node, not the 2nd lowest number in the node set ) and register a watch for that. Idea is to avoid one node being busy handling watches. So after that each client will be watching only one node and one node is watched by only one client.

See the image bellow. Here N1.. N4 are nodes created by clients and here N1 holds the lock.




So once the N1 release the lock (Since the client connection terminates ) that N1 node will be deleted which will be seen by N2 and since it rerun the algo from step2 it will be selected as the lock holder.

Here someone will ask a fair question , what if a client that created N3 dies ? That case also handled by this algorithm too. Say N3 Client dies so in that case N4 Will be notified and it will go to step 2 in the algo which will get All the children and relaise its not the leader and then it will again set a watch to N2 which the the next lowest node to it .

The below picture shows what happen in this scenario.




after N4 run the algo from Step2 it will look like this.




So having this understanding let’s go into implementation details. So 1st thing we need to have is a API design for the Lock.

In this case i’ll implement this as a Class Named ResourceLock which will take Resource Name as a constructor argument and have 3 other methods

  1. acquire - acquire the lock the given resource. If lock is not available wait till lock acquired
  2. release - release the acquired lock
  3. destroy - clean up resource like connections etc..


Following is a Sample Java implementation of this Lock I wrote. In this case i wanted to make this lock a reusable one within a application across multiple threads instead of creating and exiting zookeeper connection each time i reused it. So when a client release lock it will delete that node from Zookeeper instead of terminating the connection. I used Java Semaphores to enforce the behavior that is given to the outside world by the API as described above

Resource Lock Class :

package org.wickramarachchi.blog;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.wso2.zookeeper.sample.leader.coordination.CoordinationException;

import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Semaphore;

public class ResourceLock {

private final Object lock = new Object();

private String myNode;
private String myZNode = null;
private int myId;

private ZooKeeper zooKeeper = null;

private String address;
private int port;
private String resource;

public static String RESOURCE_LOCK_PARENT ="/resource_lock_parent";

public static String NODE_SEPARATOR = "/";


public static String RESOURCE_LOCK_NODE = "/resource_lock_node";

/**
* Creates a Distributed Lock for a resource
*
* @param address zookeeper instance host name
* @param port zookeeper instance port
* @param resource resource name
*/
public ResourceLock(String address, int port, String resource) {

this.address = address;
this.port = port;
this.resource = resource;

}


/**
* Acquire the lock for the resource. This will get blocked till the it get
* the lock for the resource
*
*/
public void acquire() throws InterruptedException, CoordinationException {
try {

if (zooKeeper == null) {
synchronized (lock) {
if (zooKeeper == null) {
System.out.println("Starting Zookeeper agent for host : " + address + " port : " + port);
zooKeeper = new ZooKeeper(address, port, null);
System.out.println("ZooKeeper agent started successfully and connected to " + address + ":"
+ port);
try {
if (zooKeeper.exists(RESOURCE_LOCK_PARENT + "_" + resource,
false) == null) {
zooKeeper.create(RESOURCE_LOCK_PARENT + "_" + resource,
new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

} catch (Exception e) {
String msg = "Error while creating Queue worker coordination parent at " +
RESOURCE_LOCK_PARENT + "_" + resource;
e.printStackTrace();
throw new CoordinationException(msg, e);
}
}
}
}
createNode();
proceed();


} catch (Exception e) {
throw new CoordinationException("Error Acquiring Lock ", e);
}


}


/**
* Release the Lock for the resource
*
* @throws CoordinationException
*/
public void release() throws CoordinationException {

try {
deleteNode();
} catch (Exception e) {
throw new CoordinationException("Error while releasing lock", e);
}

}


private void createNode() throws InterruptedException, KeeperException {


final String nodeName = RESOURCE_LOCK_NODE +
(UUID.randomUUID()).toString().replace("-", "_");
this.myNode = nodeName.replace("/", "");
String path = RESOURCE_LOCK_PARENT
+ "_" + resource + nodeName;
zooKeeper.create(path, new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

}


private void deleteNode() throws InterruptedException, KeeperException {
if (zooKeeper != null) {
String path = RESOURCE_LOCK_PARENT + "_" + resource +
NODE_SEPARATOR + myZNode;
zooKeeper.delete(path, -1);
}
}


private List<String> getChildren() throws InterruptedException, KeeperException {

return zooKeeper.getChildren(RESOURCE_LOCK_PARENT + "_" + resource, false);
}


private boolean proceed() throws InterruptedException, KeeperException {

while (true) {
final Semaphore lock = new Semaphore(1);
lock.acquire();
List<String> childNodes = getChildren();
HashMap<Integer, String> nodeIdMap = new HashMap<Integer, String>();

String selectedNode = null;
int currentMin = Integer.MAX_VALUE;
for (String child : childNodes) {
String id = child.substring(myNode.length());
int seqNumber = Integer.parseInt(id);
if (child.contains(myNode)) {
myId = seqNumber;
myZNode = child;
}
nodeIdMap.put(seqNumber, child);
if (seqNumber < currentMin) {
selectedNode = child;
currentMin = seqNumber;
}
}

assert selectedNode != null;
if (selectedNode.contains(myNode)) {
System.out.println("Lock acquired..");
break;
} else {
int myLockHolder = --myId;
Stat stat = zooKeeper.exists(RESOURCE_LOCK_PARENT +
"_" + resource + NODE_SEPARATOR +
nodeIdMap.get(myLockHolder),
new Watcher() {

@Override
public void process(WatchedEvent watchedEvent) {
if (Event.EventType.NodeDeleted == watchedEvent.getType()) {
System.out.println("Locked Release Detected.. Trying to acquire lock again..");
lock.release();

}
}
});

if (stat == null) {
System.out.println("Locked Release Detected.. Trying to acquire lock again..");
continue;
}

lock.acquire();
}


}

return true;
}

/**
* Cleanup allocated Zookeeper Resources for this Lock
*/
public void destroy() throws CoordinationException {

try {
zooKeeper.close();
} catch (InterruptedException e) {
throw new CoordinationException("Error while releasing the Queue Lock ", e);
} finally {
zooKeeper = null;
}
}

}




Test Class to Test this lock. In this case Clients from different threads try to access the same resource but Lock Make sure that one process have the resource at a given time.

package org.wickramarachchi.blog;



public class LockTest {


public static void main(String[] args) throws Exception {
new LockTest().run();
}


public void run() throws Exception{
Thread t1 = new Thread(new Process(1));
Thread t2 = new Thread(new Process(2));
Thread t3 = new Thread(new Process(3));
Thread t4 = new Thread(new Process(4));

t1.start();
t2.start();
t3.start();
t4.start();
}


class Process implements Runnable{


int id;

public Process(int id ) {
this.id = id;
}
@Override
public void run() {
try {
String resource= "resource";
ResourceLock lock = new ResourceLock("127.0.0.1", 2181,resource);


System.out.println(id + " Acquiring Lock" );
lock.acquire();
System.out.println(id + "get Lock" );

Thread.sleep(500);

System.out.println(id + " Releasing Lock" );
lock.release();
System.out.println(id + " Released Lock" );
lock.destroy();

} catch (Exception e) {
e.printStackTrace();
}
}
}
}

2 comments:

kalyan kumar said...

what is the jar file for org.wso2.zookeeper.sample.leader.coordination.CoordinationException.

please help me.

Charith Wickramarachchi said...

Good catch. Just remove that import and replace throws in acquire() with Exception