Another message redelivery pattern with WSO2 ESB

In the articles Implementing Store and Forward Messaging Patterns With WSO2ESB Part 1 and Implementing Store and Forward Messaging Patterns With WSO2ESB Part 2 . I have introduced WSO2 Message Processors and Message Stores and explained how to implement store and forward messaging patterns with them.

In this blog post i ' m trying to show how to implement a redelivery messaging pattern i recently implemented. actually in the article Implementing Store and Forward Messaging Patterns With WSO2ESB Part 2  i have described how to implement  a redelivery pattern with Message Forwarding processor. But in this post i'm going to show how to implement another redelivery pattern which can’t be achieved with Message Forwarding processor.

Following is the requirement :

  1. Client invokes a Backend service through a ESB.
  2.  If 1st invocation is successful send the response back to client and if service is not available send a fault message back.
  3.  In a failure re try to deliver the message N number of times where each re-try happens after a T interval
  4. After N failed attempts drop the message (Or store it somewhere)

Lets see how we can implement this using WSO2 ESB.

For this i’ll be using WSO2 ESB mediation sequences , Message Stores and Message Processors. If we look at how WSO2 Mediation constructs are designed. They are designed in a matter that that they act as set of mediation building blocks using which we can compose different EIP/Messaging patterns. So in this scenario we will be composing sequences , mediators , Message Stores and Processors to build this messaging pattern.

Message Flow : 

Success Scenario :
  1. Proxy Service Accepts the Message Send Message to SendToService Sequence
  2. SendToService sequence send message to the service
  3. Message response comes to proxy services out sequence using which we send back the response to the client


(Note : In this case SendFault sequence is registered as the fault handler for SendToService sequence . Which means in case if there BE service is not available that message will go to the SendFault sequence)

When service is not available :
  1. Proxy service accepts the Message and Send message to SendToServiceSeqeunce.
  2. SendToServer sequence send message to the service
  3. Since endpoint is down SendFault sequence get executed with the failed message
  4. We check for a property in a message named : retry_count , If it is not there :  It means this is the 1st failure. in that case we go to step 5. if retry_count is there go to step 6
  5. Clone the message and in one clone thread we create a fault message and send back to the client. In the other clone thread we add property: retry_count to 1 and store the message.
  6. We check if the retry_count > N if that is the case we drop the message if not increment retry_count using a xpath function and store the message. 

Note  : Here MessageSampling Processor is configured to poll messages from Message Store in every T time interval and send it to SendToService sequence.

Following is the Flow chart of the above described Message Flow.







And Below is a Sample ESB Configuration XML which implements this. 

<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://ws.apache.org/ns/synapse">
    <registry provider="org.wso2.carbon.mediation.registry.WSO2Registry">
        <parameter name="cachableDuration">15000</parameter>
    </registry>
    <proxy name="InOnlyProxy" transports="https http" startOnLoad="true" trace="disable">
        <target>
            <inSequence>
                <property name="target.endpoint" value="SimpleStockQuoteService"/>
                <sequence key="SimpleSQSender"/>
            </inSequence>
            <outSequence>
                <send/>
            </outSequence>
        </target>
    </proxy>
    <endpoint name="SimpleStockQuoteService">
        <address uri="http://127.0.0.1:9000/services/SimpleStockQuoteService">
            <suspendOnFailure>
                <errorCodes>-1</errorCodes>
                <progressionFactor>1.0</progressionFactor>
            </suspendOnFailure>
            <markForSuspension>
                <errorCodes>-1</errorCodes>
            </markForSuspension>
        </address>
    </endpoint>
    <sequence name="SimpleSQFault">
        <log level="custom">
            <property xmlns:soapenv="http://www.w3.org/2003/05/soap-envelope" xmlns:ns="http://org.apache.synapse/xsd" xmlns:ns3="http://org.apache.synapse/xsd" name="retry_count" expression="get-property('retry_count')"/>
        </log>
        <filter xmlns:soapenv="http://www.w3.org/2003/05/soap-envelope" xmlns:ns="http://org.apache.synapse/xsd" xmlns:ns3="http://org.apache.synapse/xsd" xpath="get-property('retry_count')">
            <then>
                <property name="retry_count" expression="number(get-property('retry_count'))+1" scope="default"/>
                <filter xpath="get-property('retry_count') > 10">
                    <then>
                        <log>
                            <property name="Dropping--Count" expression="get-property('retry_count')"/>
                        </log>
                        <drop/>
                    </then>
                </filter>
            </then>
            <else>
                <property name="retry_count" value="1" scope="default"/>
                <clone continueParent="true">
                    <target>
                        <sequence>
                            <makefault version="soap11">
                                <code xmlns:tns="http://www.w3.org/2003/05/soap-envelope" value="tns:Receiver"/>
                                <reason expression="get-property('ERROR_MESSAGE')"/>
                            </makefault>
                            <send/>
                            <drop/>
                        </sequence>
                    </target>
                </clone>
            </else>
        </filter>
        <clone>
            <target>
                <sequence>
                    <store messageStore="SimpleMS"/>
                </sequence>
            </target>
        </clone>
    </sequence>
    <sequence name="SimpleSQSender" onError="SimpleSQFault">
        <send>
            <endpoint key="SimpleStockQuoteService"/>
        </send>
    </sequence>
    <sequence name="fault">
        <log level="full">
            <property name="MESSAGE" value="Executing default 'fault' sequence"/>
            <property name="ERROR_CODE" expression="get-property('ERROR_CODE')"/>
            <property name="ERROR_MESSAGE" expression="get-property('ERROR_MESSAGE')"/>
        </log>
        <drop/>
    </sequence>
    <sequence name="main">
        <in>
            <log level="full"/>
            <filter source="get-property('To')" regex="http://localhost:9000.*">
                <send/>
            </filter>
        </in>
        <out>
            <send/>
        </out>
        <description>The main sequence for the message mediation</description>
    </sequence>
    <messageStore name="SimpleMS"/>
    <messageProcessor class="org.apache.synapse.message.processors.sampler.SamplingProcessor" name="Processor" messageStore="SimpleMS">
        <parameter name="interval">3000</parameter>
        <parameter name="sequence">SimpleSQSender</parameter>
    </messageProcessor>
</definitions>

For this i have used the SimpleStockQuoteService as the Backend service which is comming with WSO2 ESB Samples. In this configuration it drop the message after N retries but we can use store mediator and store it in another message store in need.


Comments

Popular posts from this blog

WSO2 Enterprise Service Bus 4.0.0 Released!

Executing maven test with the ant run plugin--A Real experiance.

How to install OpenMPI-Java