Coverage Report - org.kuali.rice.ksb.messaging.MessageServiceInvoker
 
Classes in this File Line Coverage Branch Coverage Complexity
MessageServiceInvoker
0%
0/107
0%
0/34
2.824
MessageServiceInvoker$1
0%
0/10
N/A
2.824
 
 1  
 /*
 2  
  * Copyright 2006-2011 The Kuali Foundation
 3  
  *
 4  
  * Licensed under the Educational Community License, Version 2.0 (the "License");
 5  
  * you may not use this file except in compliance with the License.
 6  
  * You may obtain a copy of the License at
 7  
  *
 8  
  * http://www.opensource.org/licenses/ecl2.php
 9  
  *
 10  
  * Unless required by applicable law or agreed to in writing, software
 11  
  * distributed under the License is distributed on an "AS IS" BASIS,
 12  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  
  * See the License for the specific language governing permissions and
 14  
  * limitations under the License.
 15  
  */
 16  
 
 17  
 package org.kuali.rice.ksb.messaging;
 18  
 
 19  
 import org.apache.log4j.Logger;
 20  
 import org.kuali.rice.core.api.config.property.ConfigContext;
 21  
 import org.kuali.rice.core.api.exception.RiceRuntimeException;
 22  
 import org.kuali.rice.core.api.resourceloader.GlobalResourceLoader;
 23  
 import org.kuali.rice.core.api.resourceloader.GlobalResourceLoader;
 24  
 import org.kuali.rice.ksb.messaging.callforwarding.ForwardedCallHandler;
 25  
 import org.kuali.rice.ksb.messaging.resourceloader.KSBResourceLoaderFactory;
 26  
 import org.kuali.rice.ksb.service.KSBServiceLocator;
 27  
 import org.kuali.rice.ksb.util.KSBConstants;
 28  
 import org.springframework.transaction.TransactionStatus;
 29  
 import org.springframework.transaction.support.TransactionCallback;
 30  
 
 31  
 import javax.xml.namespace.QName;
 32  
 import java.io.Serializable;
 33  
 import java.lang.reflect.Method;
 34  
 import java.sql.Timestamp;
 35  
 import java.util.List;
 36  
 
 37  
 /**
 38  
  * Handles invocation of a {@link PersistedMessageBO}.
 39  
  * 
 40  
  * @author Kuali Rice Team (rice.collab@kuali.org)
 41  
  */
 42  
 public class MessageServiceInvoker implements Runnable {
 43  
 
 44  0
     protected static final Logger LOG = Logger.getLogger(MessageServiceInvoker.class);
 45  
 
 46  
     private PersistedMessageBO message;
 47  
     private Object service;
 48  
     private AsynchronousCall methodCall;
 49  
 
 50  0
     public MessageServiceInvoker(PersistedMessageBO message) {
 51  0
         this.message = message;
 52  0
     }
 53  
 
 54  
     public void run() {
 55  0
         LOG.debug("calling service from persisted message " + getMessage().getRouteQueueId());
 56  0
         Object result = null;
 57  
         try {
 58  0
             result = KSBServiceLocator.getTransactionTemplate().execute(new TransactionCallback<Object>() {
 59  
                 public Object doInTransaction(TransactionStatus status) {
 60  0
                     AsynchronousCall methodCall = getMessage().getPayload().getMethodCall();
 61  0
                     Object result = null;
 62  
                     try {
 63  0
                         result = invokeService(methodCall);
 64  0
                         KSBServiceLocator.getRouteQueueService().delete(getMessage());
 65  0
                     } catch (Throwable t) {
 66  0
                         LOG.warn("Caught throwable making async service call " + methodCall, t);
 67  0
                         throw new MessageProcessingException(t);
 68  0
                     }
 69  0
                     return result;
 70  
                 }
 71  
             });
 72  0
         } catch (Throwable t) {
 73  
                 // if we are in synchronous mode, we can't put the message into exception routing, let's instead throw the error up to the calling code
 74  
                 // however, for the purposes of the unit tests, even when in synchronous mode, we want to call the exception routing service, so check a parameter for that as well
 75  0
                 boolean allowSyncExceptionRouting = new Boolean(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.ALLOW_SYNC_EXCEPTION_ROUTING));
 76  0
                       if (!allowSyncExceptionRouting && KSBConstants.MESSAGING_SYNCHRONOUS.equals(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY))) {
 77  0
                               if (t instanceof RuntimeException) {
 78  0
                                       throw (RuntimeException)t;
 79  
                               }
 80  0
                               throw new RiceRuntimeException(t);
 81  
                       } else {
 82  0
                               placeInExceptionRouting(t, getMethodCall(), getService());
 83  
                       }
 84  
         } finally {
 85  0
             try {
 86  0
                 notifyOnCallback(methodCall, result);
 87  0
             } catch (Exception e) {
 88  0
                 LOG.warn("Exception caught notifying callback", e);
 89  0
             }
 90  
             try {
 91  0
                 notifyGlobalCallbacks(methodCall, result);
 92  0
             } catch (Exception e) {
 93  0
                 LOG.warn("Exception caught notifying callback", e);
 94  0
             }
 95  0
         }
 96  0
     }
 97  
 
 98  
     /**
 99  
      * Executed when an exception is encountered during message invocation.
 100  
      * Attempts to call the ExceptionHandler for the message, if that fails it
 101  
      * will attempt to set the status of the message in the queue to
 102  
      * "EXCEPTION".
 103  
      */
 104  
     protected void placeInExceptionRouting(Throwable t, AsynchronousCall call, Object service) {
 105  0
         LOG.error("Error processing message: " + this.message, t);
 106  
         final Throwable throwable;
 107  0
         if (t instanceof MessageProcessingException) {
 108  0
             throwable = t.getCause();
 109  
         } else {
 110  0
             throwable = t;
 111  
         }
 112  
         try {
 113  
                 try {
 114  0
                         KSBServiceLocator.getExceptionRoutingService().placeInExceptionRouting(throwable, this.message, service);
 115  0
                 } catch (Throwable t1) {
 116  0
                         KSBServiceLocator.getExceptionRoutingService().placeInExceptionRoutingLastDitchEffort(throwable, this.message, service);
 117  0
                 }
 118  0
         } catch (Throwable t2) {
 119  0
             LOG.error("An error was encountered when invoking exception handler for message. Attempting to change message status to EXCEPTION.", t2);
 120  0
             message.setQueueStatus(KSBConstants.ROUTE_QUEUE_EXCEPTION);
 121  0
             message.setQueueDate(new Timestamp(System.currentTimeMillis()));
 122  
             try {
 123  0
                 KSBServiceLocator.getRouteQueueService().save(message);
 124  0
             } catch (Throwable t3) {
 125  0
                 LOG.fatal("Failed to flip status of message to EXCEPTION!!!", t3);
 126  0
             }
 127  0
         }
 128  0
     }
 129  
 
 130  
     /**
 131  
      * Invokes the AsynchronousCall represented on the methodCall on the service
 132  
      * contained in the ServiceInfo object on the AsynchronousCall.
 133  
      * 
 134  
      */
 135  
     protected Object invokeService(AsynchronousCall methodCall) throws Exception {
 136  0
         this.methodCall = methodCall;
 137  0
         ServiceInfo serviceInfo = methodCall.getServiceInfo();
 138  0
         if (LOG.isDebugEnabled()) {
 139  0
             LOG.debug("Attempting to call service " + serviceInfo.getQname());
 140  
         }
 141  
 
 142  0
         if (ConfigContext.getCurrentContextConfig().getStoreAndForward() && !methodCall.isIgnoreStoreAndForward()) {
 143  0
             QName serviceName = serviceInfo.getQname();
 144  0
             RemoteResourceServiceLocator remoteResourceLocator = KSBResourceLoaderFactory.getRemoteResourceLocator();
 145  0
             QName storeAndForwardName = new QName(serviceName.getNamespaceURI(), serviceName.getLocalPart() + KSBConstants.FORWARD_HANDLER_SUFFIX);
 146  0
             List<RemotedServiceHolder> forwardServices = remoteResourceLocator.getAllServices(storeAndForwardName);
 147  0
             if (forwardServices.isEmpty()) {
 148  0
                 LOG.warn("Could not find store and forward service " + storeAndForwardName + ".  Defaulting to regular messaging.");
 149  
             } else {
 150  0
                 serviceInfo = forwardServices.get(0).getServiceInfo();
 151  
             }
 152  0
             ForwardedCallHandler service = (ForwardedCallHandler) getService(serviceInfo);
 153  0
             this.message.setMethodCall(methodCall);
 154  0
             service.handleCall(this.message);
 155  0
             return null;
 156  
         }
 157  
 
 158  0
         Object service = getService(serviceInfo);
 159  0
         Method method = service.getClass().getMethod(methodCall.getMethodName(), methodCall.getParamTypes());
 160  0
         return method.invoke(service, methodCall.getArguments());
 161  
     }
 162  
 
 163  
     protected Object getService(ServiceInfo serviceInfo) {
 164  
         Object service;
 165  0
         if (serviceInfo.getServiceDefinition().getQueue()) {
 166  0
             service = getQueueService(serviceInfo);
 167  
         } else {
 168  0
             service = getTopicService(serviceInfo);
 169  
         }
 170  0
         return service;
 171  
     }
 172  
 
 173  
     /**
 174  
      * Get the service as a topic. This means we want to contact every service
 175  
      * that is a part of this topic. We've grabbed all the services that are a
 176  
      * part of this topic and we want to make sure that we get everyone of them =
 177  
      * that is we want to circumvent loadbalancing and therefore not ask for the
 178  
      * service by it's name but the url to get the exact service we want.
 179  
      * 
 180  
      * @param serviceInfo
 181  
      * @return
 182  
      */
 183  
     protected Object getTopicService(ServiceInfo serviceInfo) {
 184  
         // get the service locally if we have it so we don't go through any
 185  
         // remoting
 186  0
         RemotedServiceRegistry remoteRegistry = KSBServiceLocator.getServiceDeployer();
 187  0
         Object service = remoteRegistry.getService(serviceInfo.getQname(), serviceInfo.getEndpointUrl());
 188  0
         if (service != null) {
 189  0
             return service;
 190  
         }
 191  0
         RemoteResourceServiceLocator remoteResourceLocator = KSBResourceLoaderFactory.getRemoteResourceLocator();
 192  0
         return remoteResourceLocator.getService(serviceInfo.getQname(), serviceInfo.getEndpointUrl());
 193  
     }
 194  
 
 195  
     /**
 196  
      * Because this is a queue we just need to grab one.
 197  
      * 
 198  
      * @param serviceInfo
 199  
      * @return
 200  
      */
 201  
     protected Object getQueueService(ServiceInfo serviceInfo) {
 202  0
         RemotedServiceRegistry remoteRegistry = KSBServiceLocator.getServiceDeployer();
 203  0
         Object service = remoteRegistry.getLocalService(serviceInfo.getQname());
 204  0
         if (service != null) {
 205  0
             return service;
 206  
         }
 207  
         // get client to remote service if not in our local repository
 208  0
         return GlobalResourceLoader.getService(serviceInfo.getQname());
 209  
     }
 210  
 
 211  
     /**
 212  
      * Used in case the thread that dumped this work into the queue is waiting
 213  
      * for the work to be done to continue processing.
 214  
      * 
 215  
      * @param callback
 216  
      */
 217  
     protected void notifyOnCallback(AsynchronousCall methodCall, Object callResult) {
 218  0
         AsynchronousCallback callback = methodCall.getCallback();
 219  0
         notifyOnCallback(methodCall, callback, callResult);
 220  0
     }
 221  
 
 222  
     protected void notifyGlobalCallbacks(AsynchronousCall methodCall, Object callResult) {
 223  0
         if (LOG.isDebugEnabled()) {
 224  0
             LOG.debug("Notifying global callbacks");
 225  
         }
 226  0
         for (AsynchronousCallback globalCallBack : GlobalCallbackRegistry.getCallbacks()) {
 227  0
             notifyOnCallback(methodCall, globalCallBack, callResult);
 228  
         }
 229  0
     }
 230  
 
 231  
     protected void notifyOnCallback(AsynchronousCall methodCall, AsynchronousCallback callback, Object callResult) {
 232  0
         if (callback != null) {
 233  
             try {
 234  0
                 synchronized (callback) {
 235  0
                     if (LOG.isDebugEnabled()) {
 236  0
                         LOG.debug("Notifying callback " + callback + " with callResult " + callResult);
 237  
                     }
 238  0
                     callback.notifyAll();
 239  0
                     if (callResult instanceof Serializable || callResult == null) {
 240  0
                         callback.callback((Serializable) callResult, methodCall);
 241  
                     } else {
 242  
                         // may never happen
 243  0
                         LOG.warn("Attempted to call callback with non-serializable object.");
 244  
                     }
 245  0
                 }
 246  0
             } catch (Throwable t) {
 247  0
                 LOG.error("Caught throwable from callback object " + callback.getClass(), t);
 248  0
             }
 249  
         }
 250  0
     }
 251  
 
 252  
     public PersistedMessageBO getMessage() {
 253  0
         return this.message;
 254  
     }
 255  
 
 256  
     public void setMessage(PersistedMessageBO message) {
 257  0
         this.message = message;
 258  0
     }
 259  
 
 260  
     public Object getService() {
 261  0
         return this.service;
 262  
     }
 263  
 
 264  
     public AsynchronousCall getMethodCall() {
 265  0
         return this.methodCall;
 266  
     }
 267  
 
 268  
     public void setMethodCall(AsynchronousCall methodCall) {
 269  0
         this.methodCall = methodCall;
 270  0
     }
 271  
 
 272  
     public void setService(Object service) {
 273  0
         this.service = service;
 274  0
     }
 275  
 }