Coverage Report - org.kuali.rice.ksb.messaging.MessageServiceInvoker
 
Classes in this File Line Coverage Branch Coverage Complexity
MessageServiceInvoker
0%
0/94
0%
0/28
2.588
MessageServiceInvoker$1
0%
0/10
N/A
2.588
 
 1  
 /**
 2  
  * Copyright 2005-2011 The Kuali Foundation
 3  
  *
 4  
  * Licensed under the Educational Community License, Version 2.0 (the "License");
 5  
  * you may not use this file except in compliance with the License.
 6  
  * You may obtain a copy of the License at
 7  
  *
 8  
  * http://www.opensource.org/licenses/ecl2.php
 9  
  *
 10  
  * Unless required by applicable law or agreed to in writing, software
 11  
  * distributed under the License is distributed on an "AS IS" BASIS,
 12  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  
  * See the License for the specific language governing permissions and
 14  
  * limitations under the License.
 15  
  */
 16  
 package org.kuali.rice.ksb.messaging;
 17  
 
 18  
 import java.io.Serializable;
 19  
 import java.lang.reflect.Method;
 20  
 import java.sql.Timestamp;
 21  
 
 22  
 import javax.xml.namespace.QName;
 23  
 
 24  
 import org.apache.log4j.Logger;
 25  
 import org.kuali.rice.core.api.config.property.ConfigContext;
 26  
 import org.kuali.rice.core.api.exception.RiceRuntimeException;
 27  
 import org.kuali.rice.ksb.api.KsbApiServiceLocator;
 28  
 import org.kuali.rice.ksb.api.bus.Endpoint;
 29  
 import org.kuali.rice.ksb.api.bus.ServiceBus;
 30  
 import org.kuali.rice.ksb.api.bus.ServiceConfiguration;
 31  
 import org.kuali.rice.ksb.api.messaging.AsynchronousCall;
 32  
 import org.kuali.rice.ksb.api.messaging.AsynchronousCallback;
 33  
 import org.kuali.rice.ksb.service.KSBServiceLocator;
 34  
 import org.kuali.rice.ksb.util.KSBConstants;
 35  
 import org.springframework.transaction.TransactionStatus;
 36  
 import org.springframework.transaction.support.TransactionCallback;
 37  
 
 38  
 /**
 39  
  * Handles invocation of a {@link PersistedMessageBO}.
 40  
  * 
 41  
  * @author Kuali Rice Team (rice.collab@kuali.org)
 42  
  */
 43  
 public class MessageServiceInvoker implements Runnable {
 44  
 
 45  0
     protected static final Logger LOG = Logger.getLogger(MessageServiceInvoker.class);
 46  
 
 47  
     private PersistedMessageBO message;
 48  
     private Object service;
 49  
     private AsynchronousCall methodCall;
 50  
 
 51  0
     public MessageServiceInvoker(PersistedMessageBO message) {
 52  0
         this.message = message;
 53  0
     }
 54  
 
 55  
     public void run() {
 56  0
         LOG.debug("calling service from persisted message " + getMessage().getRouteQueueId());
 57  0
         Object result = null;
 58  
         try {
 59  0
             result = KSBServiceLocator.getTransactionTemplate().execute(new TransactionCallback<Object>() {
 60  
                 public Object doInTransaction(TransactionStatus status) {
 61  0
                     AsynchronousCall methodCall = getMessage().getPayload().getMethodCall();
 62  0
                     Object result = null;
 63  
                     try {
 64  0
                         result = invokeService(methodCall);
 65  0
                         KSBServiceLocator.getMessageQueueService().delete(getMessage());
 66  0
                     } catch (Throwable t) {
 67  0
                         LOG.warn("Caught throwable making async service call " + methodCall, t);
 68  0
                         throw new MessageProcessingException(t);
 69  0
                     }
 70  0
                     return result;
 71  
                 }
 72  
             });
 73  0
         } catch (Throwable t) {
 74  
                 // if we are in synchronous mode, we can't put the message into exception routing, let's instead throw the error up to the calling code
 75  
                 // however, for the purposes of the unit tests, even when in synchronous mode, we want to call the exception routing service, so check a parameter for that as well
 76  0
                 boolean allowSyncExceptionRouting = new Boolean(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.ALLOW_SYNC_EXCEPTION_ROUTING));
 77  0
                       if (!allowSyncExceptionRouting && KSBConstants.MESSAGING_SYNCHRONOUS.equals(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY))) {
 78  0
                               if (t instanceof RuntimeException) {
 79  0
                                       throw (RuntimeException)t;
 80  
                               }
 81  0
                               throw new RiceRuntimeException(t);
 82  
                       } else {
 83  0
                               placeInExceptionRouting(t, getMethodCall(), getService());
 84  
                       }
 85  
         } finally {
 86  0
             try {
 87  0
                 notifyOnCallback(methodCall, result);
 88  0
             } catch (Exception e) {
 89  0
                 LOG.warn("Exception caught notifying callback", e);
 90  0
             }
 91  
             try {
 92  0
                 notifyGlobalCallbacks(methodCall, result);
 93  0
             } catch (Exception e) {
 94  0
                 LOG.warn("Exception caught notifying callback", e);
 95  0
             }
 96  0
         }
 97  0
     }
 98  
 
 99  
     /**
 100  
      * Executed when an exception is encountered during message invocation.
 101  
      * Attempts to call the ExceptionHandler for the message, if that fails it
 102  
      * will attempt to set the status of the message in the queue to
 103  
      * "EXCEPTION".
 104  
      */
 105  
     protected void placeInExceptionRouting(Throwable t, AsynchronousCall call, Object service) {
 106  0
         LOG.error("Error processing message: " + this.message, t);
 107  
         final Throwable throwable;
 108  0
         if (t instanceof MessageProcessingException) {
 109  0
             throwable = t.getCause();
 110  
         } else {
 111  0
             throwable = t;
 112  
         }
 113  
         try {
 114  
                 try {
 115  0
                         KSBServiceLocator.getExceptionRoutingService().placeInExceptionRouting(throwable, this.message, service);
 116  0
                 } catch (Throwable t1) {
 117  0
                         KSBServiceLocator.getExceptionRoutingService().placeInExceptionRoutingLastDitchEffort(throwable, this.message, service);
 118  0
                 }
 119  0
         } catch (Throwable t2) {
 120  0
             LOG.error("An error was encountered when invoking exception handler for message. Attempting to change message status to EXCEPTION.", t2);
 121  0
             message.setQueueStatus(KSBConstants.ROUTE_QUEUE_EXCEPTION);
 122  0
             message.setQueueDate(new Timestamp(System.currentTimeMillis()));
 123  
             try {
 124  0
                 KSBServiceLocator.getMessageQueueService().save(message);
 125  0
             } catch (Throwable t3) {
 126  0
                 LOG.fatal("Failed to flip status of message to EXCEPTION!!!", t3);
 127  0
             }
 128  0
         }
 129  0
     }
 130  
 
 131  
     /**
 132  
      * Invokes the AsynchronousCall represented on the methodCall on the service
 133  
      * contained in the ServiceInfo object on the AsynchronousCall.
 134  
      * 
 135  
      */
 136  
     protected Object invokeService(AsynchronousCall methodCall) throws Exception {
 137  0
         this.methodCall = methodCall;
 138  0
         ServiceConfiguration serviceConfiguration = methodCall.getServiceConfiguration();
 139  0
         QName serviceName = serviceConfiguration.getServiceName();
 140  0
         if (LOG.isDebugEnabled()) {
 141  0
             LOG.debug("Attempting to call service " + serviceName);
 142  
         }
 143  
 
 144  0
         Object service = getService(serviceConfiguration);
 145  0
         if (service == null) {
 146  0
                 throw new RiceRuntimeException("Failed to locate service endpoint for message: " + serviceConfiguration);
 147  
         }
 148  0
         Method method = service.getClass().getMethod(methodCall.getMethodName(), methodCall.getParamTypes());
 149  0
         return method.invoke(service, methodCall.getArguments());
 150  
     }
 151  
 
 152  
     protected Object getService(ServiceConfiguration serviceConfiguration) {
 153  
         Object service;
 154  0
         if (serviceConfiguration.isQueue()) {
 155  0
             service = getQueueService(serviceConfiguration);
 156  
         } else {
 157  0
             service = getTopicService(serviceConfiguration);
 158  
         }
 159  0
         return service;
 160  
     }
 161  
 
 162  
     /**
 163  
      * Get the service as a topic. This means we want to contact every service
 164  
      * that is a part of this topic. We've grabbed all the services that are a
 165  
      * part of this topic and we want to make sure that we get everyone of them =
 166  
      * that is we want to circumvent loadbalancing and therefore not ask for the
 167  
      * service by it's name but the url to get the exact service we want.
 168  
      * 
 169  
      * @param serviceInfo
 170  
      * @return
 171  
      */
 172  
     protected Object getTopicService(ServiceConfiguration serviceConfiguration) {
 173  
         // get the service locally if we have it so we don't go through any
 174  
         // remoting
 175  0
         ServiceBus serviceBus = KsbApiServiceLocator.getServiceBus();
 176  0
         Endpoint endpoint = serviceBus.getConfiguredEndpoint(serviceConfiguration);
 177  0
         if (endpoint == null) {
 178  0
                 return null;
 179  
         }
 180  0
         return endpoint.getService();
 181  
     }
 182  
 
 183  
     /**
 184  
      * Because this is a queue we just need to grab one.
 185  
      * 
 186  
      * @param serviceInfo
 187  
      * @return
 188  
      */
 189  
     protected Object getQueueService(ServiceConfiguration serviceConfiguration) {
 190  0
             ServiceBus serviceBus = KsbApiServiceLocator.getServiceBus();
 191  0
             return serviceBus.getService(serviceConfiguration.getServiceName());
 192  
     }
 193  
 
 194  
     /**
 195  
      * Used in case the thread that dumped this work into the queue is waiting
 196  
      * for the work to be done to continue processing.
 197  
      * 
 198  
      * @param callback
 199  
      */
 200  
     protected void notifyOnCallback(AsynchronousCall methodCall, Object callResult) {
 201  0
         AsynchronousCallback callback = methodCall.getCallback();
 202  0
         notifyOnCallback(methodCall, callback, callResult);
 203  0
     }
 204  
 
 205  
     protected void notifyGlobalCallbacks(AsynchronousCall methodCall, Object callResult) {
 206  0
         if (LOG.isDebugEnabled()) {
 207  0
             LOG.debug("Notifying global callbacks");
 208  
         }
 209  0
         for (AsynchronousCallback globalCallBack : GlobalCallbackRegistry.getCallbacks()) {
 210  0
             notifyOnCallback(methodCall, globalCallBack, callResult);
 211  
         }
 212  0
     }
 213  
 
 214  
     protected void notifyOnCallback(AsynchronousCall methodCall, AsynchronousCallback callback, Object callResult) {
 215  0
         if (callback != null) {
 216  
             try {
 217  0
                 synchronized (callback) {
 218  0
                     if (LOG.isDebugEnabled()) {
 219  0
                         LOG.debug("Notifying callback " + callback + " with callResult " + callResult);
 220  
                     }
 221  0
                     callback.notifyAll();
 222  0
                     if (callResult instanceof Serializable || callResult == null) {
 223  0
                         callback.callback((Serializable) callResult, methodCall);
 224  
                     } else {
 225  
                         // may never happen
 226  0
                         LOG.warn("Attempted to call callback with non-serializable object.");
 227  
                     }
 228  0
                 }
 229  0
             } catch (Throwable t) {
 230  0
                 LOG.error("Caught throwable from callback object " + callback.getClass(), t);
 231  0
             }
 232  
         }
 233  0
     }
 234  
 
 235  
     public PersistedMessageBO getMessage() {
 236  0
         return this.message;
 237  
     }
 238  
 
 239  
     public void setMessage(PersistedMessageBO message) {
 240  0
         this.message = message;
 241  0
     }
 242  
 
 243  
     public Object getService() {
 244  0
         return this.service;
 245  
     }
 246  
 
 247  
     public AsynchronousCall getMethodCall() {
 248  0
         return this.methodCall;
 249  
     }
 250  
 
 251  
     public void setMethodCall(AsynchronousCall methodCall) {
 252  0
         this.methodCall = methodCall;
 253  0
     }
 254  
 
 255  
     public void setService(Object service) {
 256  0
         this.service = service;
 257  0
     }
 258  
 }