Coverage Report - org.kuali.rice.ksb.messaging.MessageServiceInvoker
 
Classes in this File Line Coverage Branch Coverage Complexity
MessageServiceInvoker
5%
5/94
0%
0/28
2.588
MessageServiceInvoker$1
0%
0/10
N/A
2.588
 
 1  
 /*
 2  
  * Copyright 2006-2011 The Kuali Foundation
 3  
  *
 4  
  * Licensed under the Educational Community License, Version 2.0 (the "License");
 5  
  * you may not use this file except in compliance with the License.
 6  
  * You may obtain a copy of the License at
 7  
  *
 8  
  * http://www.opensource.org/licenses/ecl2.php
 9  
  *
 10  
  * Unless required by applicable law or agreed to in writing, software
 11  
  * distributed under the License is distributed on an "AS IS" BASIS,
 12  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  
  * See the License for the specific language governing permissions and
 14  
  * limitations under the License.
 15  
  */
 16  
 
 17  
 package org.kuali.rice.ksb.messaging;
 18  
 
 19  
 import java.io.Serializable;
 20  
 import java.lang.reflect.Method;
 21  
 import java.sql.Timestamp;
 22  
 
 23  
 import javax.xml.namespace.QName;
 24  
 
 25  
 import org.apache.log4j.Logger;
 26  
 import org.kuali.rice.core.api.config.property.ConfigContext;
 27  
 import org.kuali.rice.core.api.exception.RiceRuntimeException;
 28  
 import org.kuali.rice.ksb.api.KsbApiServiceLocator;
 29  
 import org.kuali.rice.ksb.api.bus.Endpoint;
 30  
 import org.kuali.rice.ksb.api.bus.ServiceBus;
 31  
 import org.kuali.rice.ksb.api.bus.ServiceConfiguration;
 32  
 import org.kuali.rice.ksb.api.messaging.AsynchronousCall;
 33  
 import org.kuali.rice.ksb.api.messaging.AsynchronousCallback;
 34  
 import org.kuali.rice.ksb.service.KSBServiceLocator;
 35  
 import org.kuali.rice.ksb.util.KSBConstants;
 36  
 import org.springframework.transaction.TransactionStatus;
 37  
 import org.springframework.transaction.support.TransactionCallback;
 38  
 
 39  
 /**
 40  
  * Handles invocation of a {@link PersistedMessageBO}.
 41  
  * 
 42  
  * @author Kuali Rice Team (rice.collab@kuali.org)
 43  
  */
 44  
 public class MessageServiceInvoker implements Runnable {
 45  
 
 46  1
     protected static final Logger LOG = Logger.getLogger(MessageServiceInvoker.class);
 47  
 
 48  
     private PersistedMessageBO message;
 49  
     private Object service;
 50  
     private AsynchronousCall methodCall;
 51  
 
 52  6
     public MessageServiceInvoker(PersistedMessageBO message) {
 53  6
         this.message = message;
 54  6
     }
 55  
 
 56  
     public void run() {
 57  0
         LOG.debug("calling service from persisted message " + getMessage().getRouteQueueId());
 58  0
         Object result = null;
 59  
         try {
 60  0
             result = KSBServiceLocator.getTransactionTemplate().execute(new TransactionCallback<Object>() {
 61  
                 public Object doInTransaction(TransactionStatus status) {
 62  0
                     AsynchronousCall methodCall = getMessage().getPayload().getMethodCall();
 63  0
                     Object result = null;
 64  
                     try {
 65  0
                         result = invokeService(methodCall);
 66  0
                         KSBServiceLocator.getMessageQueueService().delete(getMessage());
 67  0
                     } catch (Throwable t) {
 68  0
                         LOG.warn("Caught throwable making async service call " + methodCall, t);
 69  0
                         throw new MessageProcessingException(t);
 70  0
                     }
 71  0
                     return result;
 72  
                 }
 73  
             });
 74  0
         } catch (Throwable t) {
 75  
                 // if we are in synchronous mode, we can't put the message into exception routing, let's instead throw the error up to the calling code
 76  
                 // however, for the purposes of the unit tests, even when in synchronous mode, we want to call the exception routing service, so check a parameter for that as well
 77  0
                 boolean allowSyncExceptionRouting = new Boolean(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.ALLOW_SYNC_EXCEPTION_ROUTING));
 78  0
                       if (!allowSyncExceptionRouting && KSBConstants.MESSAGING_SYNCHRONOUS.equals(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY))) {
 79  0
                               if (t instanceof RuntimeException) {
 80  0
                                       throw (RuntimeException)t;
 81  
                               }
 82  0
                               throw new RiceRuntimeException(t);
 83  
                       } else {
 84  0
                               placeInExceptionRouting(t, getMethodCall(), getService());
 85  
                       }
 86  
         } finally {
 87  0
             try {
 88  0
                 notifyOnCallback(methodCall, result);
 89  0
             } catch (Exception e) {
 90  0
                 LOG.warn("Exception caught notifying callback", e);
 91  0
             }
 92  
             try {
 93  0
                 notifyGlobalCallbacks(methodCall, result);
 94  0
             } catch (Exception e) {
 95  0
                 LOG.warn("Exception caught notifying callback", e);
 96  0
             }
 97  0
         }
 98  0
     }
 99  
 
 100  
     /**
 101  
      * Executed when an exception is encountered during message invocation.
 102  
      * Attempts to call the ExceptionHandler for the message, if that fails it
 103  
      * will attempt to set the status of the message in the queue to
 104  
      * "EXCEPTION".
 105  
      */
 106  
     protected void placeInExceptionRouting(Throwable t, AsynchronousCall call, Object service) {
 107  0
         LOG.error("Error processing message: " + this.message, t);
 108  
         final Throwable throwable;
 109  0
         if (t instanceof MessageProcessingException) {
 110  0
             throwable = t.getCause();
 111  
         } else {
 112  0
             throwable = t;
 113  
         }
 114  
         try {
 115  
                 try {
 116  0
                         KSBServiceLocator.getExceptionRoutingService().placeInExceptionRouting(throwable, this.message, service);
 117  0
                 } catch (Throwable t1) {
 118  0
                         KSBServiceLocator.getExceptionRoutingService().placeInExceptionRoutingLastDitchEffort(throwable, this.message, service);
 119  0
                 }
 120  0
         } catch (Throwable t2) {
 121  0
             LOG.error("An error was encountered when invoking exception handler for message. Attempting to change message status to EXCEPTION.", t2);
 122  0
             message.setQueueStatus(KSBConstants.ROUTE_QUEUE_EXCEPTION);
 123  0
             message.setQueueDate(new Timestamp(System.currentTimeMillis()));
 124  
             try {
 125  0
                 KSBServiceLocator.getMessageQueueService().save(message);
 126  0
             } catch (Throwable t3) {
 127  0
                 LOG.fatal("Failed to flip status of message to EXCEPTION!!!", t3);
 128  0
             }
 129  0
         }
 130  0
     }
 131  
 
 132  
     /**
 133  
      * Invokes the AsynchronousCall represented on the methodCall on the service
 134  
      * contained in the ServiceInfo object on the AsynchronousCall.
 135  
      * 
 136  
      */
 137  
     protected Object invokeService(AsynchronousCall methodCall) throws Exception {
 138  0
         this.methodCall = methodCall;
 139  0
         ServiceConfiguration serviceConfiguration = methodCall.getServiceConfiguration();
 140  0
         QName serviceName = serviceConfiguration.getServiceName();
 141  0
         if (LOG.isDebugEnabled()) {
 142  0
             LOG.debug("Attempting to call service " + serviceName);
 143  
         }
 144  
 
 145  0
         Object service = getService(serviceConfiguration);
 146  0
         if (service == null) {
 147  0
                 throw new RiceRuntimeException("Failed to locate service endpoint for message: " + serviceConfiguration);
 148  
         }
 149  0
         Method method = service.getClass().getMethod(methodCall.getMethodName(), methodCall.getParamTypes());
 150  0
         return method.invoke(service, methodCall.getArguments());
 151  
     }
 152  
 
 153  
     protected Object getService(ServiceConfiguration serviceConfiguration) {
 154  
         Object service;
 155  0
         if (serviceConfiguration.isQueue()) {
 156  0
             service = getQueueService(serviceConfiguration);
 157  
         } else {
 158  0
             service = getTopicService(serviceConfiguration);
 159  
         }
 160  0
         return service;
 161  
     }
 162  
 
 163  
     /**
 164  
      * Get the service as a topic. This means we want to contact every service
 165  
      * that is a part of this topic. We've grabbed all the services that are a
 166  
      * part of this topic and we want to make sure that we get everyone of them =
 167  
      * that is we want to circumvent loadbalancing and therefore not ask for the
 168  
      * service by it's name but the url to get the exact service we want.
 169  
      * 
 170  
      * @param serviceInfo
 171  
      * @return
 172  
      */
 173  
     protected Object getTopicService(ServiceConfiguration serviceConfiguration) {
 174  
         // get the service locally if we have it so we don't go through any
 175  
         // remoting
 176  0
         ServiceBus serviceBus = KsbApiServiceLocator.getServiceBus();
 177  0
         Endpoint endpoint = serviceBus.getConfiguredEndpoint(serviceConfiguration);
 178  0
         if (endpoint == null) {
 179  0
                 return null;
 180  
         }
 181  0
         return endpoint.getService();
 182  
     }
 183  
 
 184  
     /**
 185  
      * Because this is a queue we just need to grab one.
 186  
      * 
 187  
      * @param serviceInfo
 188  
      * @return
 189  
      */
 190  
     protected Object getQueueService(ServiceConfiguration serviceConfiguration) {
 191  0
             ServiceBus serviceBus = KsbApiServiceLocator.getServiceBus();
 192  0
             return serviceBus.getService(serviceConfiguration.getServiceName());
 193  
     }
 194  
 
 195  
     /**
 196  
      * Used in case the thread that dumped this work into the queue is waiting
 197  
      * for the work to be done to continue processing.
 198  
      * 
 199  
      * @param callback
 200  
      */
 201  
     protected void notifyOnCallback(AsynchronousCall methodCall, Object callResult) {
 202  0
         AsynchronousCallback callback = methodCall.getCallback();
 203  0
         notifyOnCallback(methodCall, callback, callResult);
 204  0
     }
 205  
 
 206  
     protected void notifyGlobalCallbacks(AsynchronousCall methodCall, Object callResult) {
 207  0
         if (LOG.isDebugEnabled()) {
 208  0
             LOG.debug("Notifying global callbacks");
 209  
         }
 210  0
         for (AsynchronousCallback globalCallBack : GlobalCallbackRegistry.getCallbacks()) {
 211  0
             notifyOnCallback(methodCall, globalCallBack, callResult);
 212  
         }
 213  0
     }
 214  
 
 215  
     protected void notifyOnCallback(AsynchronousCall methodCall, AsynchronousCallback callback, Object callResult) {
 216  0
         if (callback != null) {
 217  
             try {
 218  0
                 synchronized (callback) {
 219  0
                     if (LOG.isDebugEnabled()) {
 220  0
                         LOG.debug("Notifying callback " + callback + " with callResult " + callResult);
 221  
                     }
 222  0
                     callback.notifyAll();
 223  0
                     if (callResult instanceof Serializable || callResult == null) {
 224  0
                         callback.callback((Serializable) callResult, methodCall);
 225  
                     } else {
 226  
                         // may never happen
 227  0
                         LOG.warn("Attempted to call callback with non-serializable object.");
 228  
                     }
 229  0
                 }
 230  0
             } catch (Throwable t) {
 231  0
                 LOG.error("Caught throwable from callback object " + callback.getClass(), t);
 232  0
             }
 233  
         }
 234  0
     }
 235  
 
 236  
     public PersistedMessageBO getMessage() {
 237  6
         return this.message;
 238  
     }
 239  
 
 240  
     public void setMessage(PersistedMessageBO message) {
 241  0
         this.message = message;
 242  0
     }
 243  
 
 244  
     public Object getService() {
 245  0
         return this.service;
 246  
     }
 247  
 
 248  
     public AsynchronousCall getMethodCall() {
 249  0
         return this.methodCall;
 250  
     }
 251  
 
 252  
     public void setMethodCall(AsynchronousCall methodCall) {
 253  0
         this.methodCall = methodCall;
 254  0
     }
 255  
 
 256  
     public void setService(Object service) {
 257  0
         this.service = service;
 258  0
     }
 259  
 }