Monday, September 8, 2014

ESB Guaranteed Delivery with client acknowledgement for synchronous clients

A typical guaranteed delivery scenario covered as an Enterprise Integration ensures the delivery of a message to a given backend endpoint. As per this pattern the ESB would attempt to delivers the message to the backend endpoint and if it fails, the message would be stored and delivered once the backend endpoint becomes available. If the above guaranteed delivery mechanism is implemented to a message flow that is synchronous the client would be waiting for a response from the ESB until the message times out. To accommodate synchronous clients in a guaranteed delivery scenario, it is possible for the ESB to respond to the client with a fault message in case the message is not delivered and stored for later delivery. The following diagram depicts the message flow of the guaranteed delivery scenario with client acknowledgement using the WSO2 ESB.




1.      Client sends the message to the ESB.
2.      ESB try to deliver the message to the backend system, and the message fails
3.      ESB stores the message in a message queue
4.      ESB sends a response back to the client (response can be customized as required)
5.      ESB picks the message from the queue and try to deliver the message to the backend system; ESB retry can be customized to retry a given number of attempts.



The following synapse configuration can be used to configure this scenario. The configuration includes a proxy service, message broker, message processor, sequences and endpoints required to create the required mediation flow. Please note that the configuration is based on storing messages in an ActiveMQ based message queue. Hence it is required to follow the steps mentioned in the following link [1] setting up ActiveMQ with WSO2 ESB.

<proxy name="StockQuoteProxy"
          transports="https http"
          startOnLoad="true"
          trace="disable">
      <description/>
      <target>
         <inSequence>
            <sequence key="delivery_seq"/>
         </inSequence>
         <outSequence>
            <send/>
         </outSequence>
      </target>
  <publishWSDL uri="file:repository/samples/resources/proxy/sample_proxy_1.wsdl"/>
   </proxy>
   <endpoint name="queueEP">
      <address uri="http://localhost:9000/services/SimpleStockQuoteService"/>
   </endpoint>
   <endpoint name="StockQuoteFO">
   <address uri="http://localhost:9000/services/SimpleStockQuoteService"</endpoint>
   <sequence name="delivery_seq" onError="errorHandler">
      <enrich>
         <source type="envelope" clone="true"/>
         <target type="property" property="mssg"/>
      </enrich>
      <send>
         <endpoint key="StockQuoteFO"/>
      </send>
   </sequence>
   <sequence name="SendResponse">
      <header name="To" action="remove"/>
      <property name="RESPONSE" value="true"/>
      <send/>
   </sequence>
   <sequence name="errorHandler">
      <makefault version="soap11">
         <code xmlns:tns="http://www.w3.org/2003/05/soap-envelope" value="tns:Receiver"/>
         <reason value="Message has been stored."/>
      </makefault>
      <clone>
         <target sequence="SendResponse"/>
         <target sequence="queueMessage"/>
      </clone>
   </sequence>
  
   <sequence name="queueMessage">
      <enrich>
         <source type="property" clone="true" property="mssg"/>
         <target type="envelope"/>
      </enrich>
      <property name="target.endpoint" value="queueEP"/>
      <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
      <property name="OUT_ONLY" value="true"/>
      <store messageStore="JMStore"/>
   </sequence>
  
   <messageStore class="org.apache.synapse.message.store.impl.jms.JmsStore"
                 name="JMStore">
      <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
      <parameter name="store.jms.cache.connection">false</parameter>
      <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter>
      <parameter name="store.jms.JMSSpecVersion">1.1</parameter>
   </messageStore>
   <messageProcessor class="org.apache.synapse.message.processor.impl.forwarder.ScheduledMessageForwardingProcessor"
                     name="ScheduledProcessor"
                     messageStore="JMStore">
      <parameter name="interval">20000</parameter>
      <parameter name="is.active">true</parameter>

   </messageProcessor>


[1] https://docs.wso2.com/display/ESB481/Configure+with+ActiveMQ