Coverage Report - org.kuali.rice.ksb.messaging.MessageServiceInvoker
 
Classes in this File Line Coverage Branch Coverage Complexity
MessageServiceInvoker
0%
0/108
0%
0/34
2.824
MessageServiceInvoker$1
0%
0/10
N/A
2.824
 
 1  
 /*
 2  
  * Copyright 2005-2008 The Kuali Foundation
 3  
  * 
 4  
  * 
 5  
  * Licensed under the Educational Community License, Version 2.0 (the "License"); you may not use this file except in
 6  
  * compliance with the License. You may obtain a copy of the License at
 7  
  * 
 8  
  * http://www.opensource.org/licenses/ecl2.php
 9  
  * 
 10  
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS
 11  
  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific
 12  
  * language governing permissions and limitations under the License.
 13  
  */
 14  
 package org.kuali.rice.ksb.messaging;
 15  
 
 16  
 import java.io.Serializable;
 17  
 import java.lang.reflect.Method;
 18  
 import java.sql.Timestamp;
 19  
 import java.util.List;
 20  
 
 21  
 import javax.xml.namespace.QName;
 22  
 
 23  
 import org.apache.log4j.Logger;
 24  
 import org.kuali.rice.core.config.ConfigContext;
 25  
 import org.kuali.rice.core.exception.RiceRuntimeException;
 26  
 import org.kuali.rice.core.resourceloader.GlobalResourceLoader;
 27  
 import org.kuali.rice.ksb.messaging.callforwarding.ForwardedCallHandler;
 28  
 import org.kuali.rice.ksb.messaging.resourceloader.KSBResourceLoaderFactory;
 29  
 import org.kuali.rice.ksb.service.KSBServiceLocator;
 30  
 import org.kuali.rice.ksb.util.KSBConstants;
 31  
 import org.springframework.transaction.TransactionStatus;
 32  
 import org.springframework.transaction.support.TransactionCallback;
 33  
 
 34  
 /**
 35  
  * Handles invocation of a {@link PersistedMessage}.
 36  
  * 
 37  
  * @author Kuali Rice Team (rice.collab@kuali.org)
 38  
  */
 39  
 public class MessageServiceInvoker implements Runnable {
 40  
 
 41  0
     protected static final Logger LOG = Logger.getLogger(MessageServiceInvoker.class);
 42  
 
 43  
     private PersistedMessage message;
 44  
     private Object service;
 45  
     private AsynchronousCall methodCall;
 46  
 
 47  0
     public MessageServiceInvoker(PersistedMessage message) {
 48  0
         this.message = message;
 49  0
     }
 50  
 
 51  
     public void run() {
 52  0
         LOG.debug("calling service from persisted message " + getMessage().getRouteQueueId());
 53  0
         Object result = null;
 54  
         try {
 55  0
             result = KSBServiceLocator.getTransactionTemplate().execute(new TransactionCallback() {
 56  
                 public Object doInTransaction(TransactionStatus status) {
 57  0
                     AsynchronousCall methodCall = getMessage().getPayload().getMethodCall();
 58  0
                     Object result = null;
 59  
                     try {
 60  0
                         result = invokeService(methodCall);
 61  0
                         KSBServiceLocator.getRouteQueueService().delete(getMessage());
 62  0
                     } catch (Throwable t) {
 63  0
                         LOG.warn("Caught throwable making async service call " + methodCall, t);
 64  0
                         throw new MessageProcessingException(t);
 65  0
                     }
 66  0
                     return result;
 67  
                 }
 68  
             });
 69  0
         } catch (Throwable t) {
 70  
                 // if we are in synchronous mode, we can't put the message into exception routing, let's instead throw the error up to the calling code
 71  
                 // however, for the purposes of the unit tests, even when in synchronous mode, we want to call the exception routing service, so check a parameter for that as well
 72  0
                 boolean allowSyncExceptionRouting = new Boolean(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.ALLOW_SYNC_EXCEPTION_ROUTING));
 73  0
                       if (!allowSyncExceptionRouting && KSBConstants.MESSAGING_SYNCHRONOUS.equals(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.MESSAGE_DELIVERY))) {
 74  0
                               if (t instanceof RuntimeException) {
 75  0
                                       throw (RuntimeException)t;
 76  
                               }
 77  0
                               throw new RiceRuntimeException(t);
 78  
                       } else {
 79  0
                               placeInExceptionRouting(t, getMethodCall(), getService());
 80  
                       }
 81  0
         } finally {
 82  0
             try {
 83  0
                 notifyOnCallback(methodCall, result);
 84  0
             } catch (Exception e) {
 85  0
                 LOG.warn("Exception caught notifying callback", e);
 86  0
             }
 87  
             try {
 88  0
                 notifyGlobalCallbacks(methodCall, result);
 89  0
             } catch (Exception e) {
 90  0
                 LOG.warn("Exception caught notifying callback", e);
 91  0
             }
 92  0
         }
 93  0
     }
 94  
 
 95  
     /**
 96  
      * Executed when an exception is encountered during message invocation.
 97  
      * Attempts to call the ExceptionHandler for the message, if that fails it
 98  
      * will attempt to set the status of the message in the queue to
 99  
      * "EXCEPTION".
 100  
      */
 101  
     protected void placeInExceptionRouting(Throwable t, AsynchronousCall call, Object service) {
 102  0
         LOG.error("Error processing message: " + this.message, t);
 103  
         final Throwable throwable;
 104  0
         if (t instanceof MessageProcessingException) {
 105  0
             throwable = t.getCause();
 106  
         } else {
 107  0
             throwable = t;
 108  
         }
 109  
         try {
 110  
                 try {
 111  0
                         KSBServiceLocator.getExceptionRoutingService().placeInExceptionRouting(throwable, this.message, service);
 112  0
                 } catch (Throwable t1) {
 113  0
                         KSBServiceLocator.getExceptionRoutingService().placeInExceptionRoutingLastDitchEffort(throwable, this.message, service);
 114  0
                 }
 115  0
         } catch (Throwable t2) {
 116  0
             LOG.error("An error was encountered when invoking exception handler for message. Attempting to change message status to EXCEPTION.", t2);
 117  0
             message.setQueueStatus(KSBConstants.ROUTE_QUEUE_EXCEPTION);
 118  0
             message.setQueueDate(new Timestamp(System.currentTimeMillis()));
 119  
             try {
 120  0
                 KSBServiceLocator.getRouteQueueService().save(message);
 121  0
             } catch (Throwable t3) {
 122  0
                 LOG.fatal("Failed to flip status of message to EXCEPTION!!!", t3);
 123  0
             }
 124  0
         }
 125  0
     }
 126  
 
 127  
     /**
 128  
      * Invokes the AsynchronousCall represented on the methodCall on the service
 129  
      * contained in the ServiceInfo object on the AsynchronousCall.
 130  
      * 
 131  
      */
 132  
     protected Object invokeService(AsynchronousCall methodCall) throws Exception {
 133  0
         this.methodCall = methodCall;
 134  0
         ServiceInfo serviceInfo = methodCall.getServiceInfo();
 135  0
         if (LOG.isDebugEnabled()) {
 136  0
             LOG.debug("Attempting to call service " + serviceInfo.getQname());
 137  
         }
 138  
 
 139  0
         if (ConfigContext.getCurrentContextConfig().getStoreAndForward() && !methodCall.isIgnoreStoreAndForward()) {
 140  0
             QName serviceName = serviceInfo.getQname();
 141  0
             RemoteResourceServiceLocator remoteResourceLocator = KSBResourceLoaderFactory.getRemoteResourceLocator();
 142  0
             QName storeAndForwardName = new QName(serviceName.getNamespaceURI(), serviceName.getLocalPart() + KSBConstants.FORWARD_HANDLER_SUFFIX);
 143  0
             List<RemotedServiceHolder> forwardServices = remoteResourceLocator.getAllServices(storeAndForwardName);
 144  0
             if (forwardServices.isEmpty()) {
 145  0
                 LOG.warn("Could not find store and forward service " + storeAndForwardName + ".  Defaulting to regular messaging.");
 146  
             } else {
 147  0
                 serviceInfo = forwardServices.get(0).getServiceInfo();
 148  
             }
 149  0
             ForwardedCallHandler service = (ForwardedCallHandler) getService(serviceInfo);
 150  0
             this.message.setMethodCall(methodCall);
 151  0
             service.handleCall(this.message);
 152  0
             return null;
 153  
         }
 154  
 
 155  0
         Object service = getService(serviceInfo);
 156  0
         Method method = service.getClass().getMethod(methodCall.getMethodName(), methodCall.getParamTypes());
 157  0
         return method.invoke(service, methodCall.getArguments());
 158  
     }
 159  
 
 160  
     protected Object getService(ServiceInfo serviceInfo) {
 161  
         Object service;
 162  0
         if (serviceInfo.getServiceDefinition().getQueue()) {
 163  0
             service = getQueueService(serviceInfo);
 164  
         } else {
 165  0
             service = getTopicService(serviceInfo);
 166  
         }
 167  0
         return service;
 168  
     }
 169  
 
 170  
     /**
 171  
      * Get the service as a topic. This means we want to contact every service
 172  
      * that is a part of this topic. We've grabbed all the services that are a
 173  
      * part of this topic and we want to make sure that we get everyone of them =
 174  
      * that is we want to circumvent loadbalancing and therefore not ask for the
 175  
      * service by it's name but the url to get the exact service we want.
 176  
      * 
 177  
      * @param serviceInfo
 178  
      * @return
 179  
      */
 180  
     protected Object getTopicService(ServiceInfo serviceInfo) {
 181  
         // get the service locally if we have it so we don't go through any
 182  
         // remoting
 183  0
         RemotedServiceRegistry remoteRegistry = KSBServiceLocator.getServiceDeployer();
 184  0
         Object service = remoteRegistry.getService(serviceInfo.getQname(), serviceInfo.getEndpointUrl());
 185  0
         if (service != null) {
 186  0
             return service;
 187  
         }
 188  0
         RemoteResourceServiceLocator remoteResourceLocator = KSBResourceLoaderFactory.getRemoteResourceLocator();
 189  0
         return remoteResourceLocator.getService(serviceInfo.getQname(), serviceInfo.getEndpointUrl());
 190  
     }
 191  
 
 192  
     /**
 193  
      * Because this is a queue we just need to grab one.
 194  
      * 
 195  
      * @param serviceInfo
 196  
      * @return
 197  
      */
 198  
     protected Object getQueueService(ServiceInfo serviceInfo) {
 199  0
         RemotedServiceRegistry remoteRegistry = KSBServiceLocator.getServiceDeployer();
 200  0
         Object service = remoteRegistry.getLocalService(serviceInfo.getQname());
 201  0
         if (service != null) {
 202  0
             return service;
 203  
         }
 204  
         // get client to remote service if not in our local repository
 205  0
         return GlobalResourceLoader.getService(serviceInfo.getQname());
 206  
     }
 207  
 
 208  
     /**
 209  
      * Used in case the thread that dumped this work into the queue is waiting
 210  
      * for the work to be done to continue processing.
 211  
      * 
 212  
      * @param callback
 213  
      */
 214  
     protected void notifyOnCallback(AsynchronousCall methodCall, Object callResult) {
 215  0
         AsynchronousCallback callback = methodCall.getCallback();
 216  0
         notifyOnCallback(methodCall, callback, callResult);
 217  0
     }
 218  
 
 219  
     protected void notifyGlobalCallbacks(AsynchronousCall methodCall, Object callResult) {
 220  0
         if (LOG.isDebugEnabled()) {
 221  0
             LOG.debug("Notifying global callbacks");
 222  
         }
 223  0
         for (AsynchronousCallback globalCallBack : GlobalCallbackRegistry.getCallbacks()) {
 224  0
             notifyOnCallback(methodCall, globalCallBack, callResult);
 225  
         }
 226  0
     }
 227  
 
 228  
     protected void notifyOnCallback(AsynchronousCall methodCall, AsynchronousCallback callback, Object callResult) {
 229  0
         if (callback != null) {
 230  
             try {
 231  0
                 synchronized (callback) {
 232  0
                     if (LOG.isDebugEnabled()) {
 233  0
                         LOG.debug("Notifying callback " + callback + " with callResult " + callResult);
 234  
                     }
 235  0
                     callback.notifyAll();
 236  0
                     if (callResult instanceof Serializable || callResult == null) {
 237  0
                         callback.callback((Serializable) callResult, methodCall);
 238  
                     } else {
 239  
                         // may never happen
 240  0
                         LOG.warn("Attempted to call callback with non-serializable object.");
 241  
                     }
 242  0
                 }
 243  0
             } catch (Throwable t) {
 244  0
                 LOG.error("Caught throwable from callback object " + callback.getClass(), t);
 245  0
             }
 246  
         }
 247  0
     }
 248  
 
 249  
     public PersistedMessage getMessage() {
 250  0
         return this.message;
 251  
     }
 252  
 
 253  
     public void setMessage(PersistedMessage message) {
 254  0
         this.message = message;
 255  0
     }
 256  
 
 257  
     public Object getService() {
 258  0
         return this.service;
 259  
     }
 260  
 
 261  
     public AsynchronousCall getMethodCall() {
 262  0
         return this.methodCall;
 263  
     }
 264  
 
 265  
     public void setMethodCall(AsynchronousCall methodCall) {
 266  0
         this.methodCall = methodCall;
 267  0
     }
 268  
 
 269  
     public void setService(Object service) {
 270  0
         this.service = service;
 271  0
     }
 272  
 }