001 /** 002 * Copyright 2005-2012 The Kuali Foundation 003 * 004 * Licensed under the Educational Community License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl2.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 package org.kuali.rice.ksb.messaging; 017 018 import java.io.Serializable; 019 import java.lang.reflect.Method; 020 import java.sql.Timestamp; 021 022 import javax.xml.namespace.QName; 023 024 import org.apache.log4j.Logger; 025 import org.kuali.rice.core.api.config.property.ConfigContext; 026 import org.kuali.rice.core.api.exception.RiceRuntimeException; 027 import org.kuali.rice.ksb.api.KsbApiServiceLocator; 028 import org.kuali.rice.ksb.api.bus.Endpoint; 029 import org.kuali.rice.ksb.api.bus.ServiceBus; 030 import org.kuali.rice.ksb.api.bus.ServiceConfiguration; 031 import org.kuali.rice.ksb.api.messaging.AsynchronousCall; 032 import org.kuali.rice.ksb.api.messaging.AsynchronousCallback; 033 import org.kuali.rice.ksb.service.KSBServiceLocator; 034 import org.kuali.rice.ksb.util.KSBConstants; 035 import org.springframework.transaction.TransactionStatus; 036 import org.springframework.transaction.support.TransactionCallback; 037 038 /** 039 * Handles invocation of a {@link PersistedMessageBO}. 040 * 041 * @author Kuali Rice Team (rice.collab@kuali.org) 042 */ 043 public class MessageServiceInvoker implements Runnable { 044 045 protected static final Logger LOG = Logger.getLogger(MessageServiceInvoker.class); 046 047 private PersistedMessageBO message; 048 private Object service; 049 private AsynchronousCall methodCall; 050 051 public MessageServiceInvoker(PersistedMessageBO message) { 052 this.message = message; 053 } 054 055 public void run() { 056 LOG.debug("calling service from persisted message " + getMessage().getRouteQueueId()); 057 Object result = null; 058 try { 059 result = KSBServiceLocator.getTransactionTemplate().execute(new TransactionCallback<Object>() { 060 public Object doInTransaction(TransactionStatus status) { 061 AsynchronousCall methodCall = getMessage().getPayload().getMethodCall(); 062 Object result = null; 063 try { 064 result = invokeService(methodCall); 065 KSBServiceLocator.getMessageQueueService().delete(getMessage()); 066 } catch (Throwable t) { 067 LOG.warn("Caught throwable making async service call " + methodCall, t); 068 throw new MessageProcessingException(t); 069 } 070 return result; 071 } 072 }); 073 } catch (Throwable t) { 074 // if we are in synchronous mode, we can't put the message into exception routing, let's instead throw the error up to the calling code 075 // however, for the purposes of the unit tests, even when in synchronous mode, we want to call the exception routing service, so check a parameter for that as well 076 boolean allowSyncExceptionRouting = new Boolean(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.ALLOW_SYNC_EXCEPTION_ROUTING)); 077 if (!allowSyncExceptionRouting && KSBConstants.MESSAGING_SYNCHRONOUS.equals(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY))) { 078 if (t instanceof RuntimeException) { 079 throw (RuntimeException)t; 080 } 081 throw new RiceRuntimeException(t); 082 } else { 083 placeInExceptionRouting(t, getMethodCall(), getService()); 084 } 085 } finally { 086 try { 087 notifyOnCallback(methodCall, result); 088 } catch (Exception e) { 089 LOG.warn("Exception caught notifying callback", e); 090 } 091 try { 092 notifyGlobalCallbacks(methodCall, result); 093 } catch (Exception e) { 094 LOG.warn("Exception caught notifying callback", e); 095 } 096 } 097 } 098 099 /** 100 * Executed when an exception is encountered during message invocation. 101 * Attempts to call the ExceptionHandler for the message, if that fails it 102 * will attempt to set the status of the message in the queue to 103 * "EXCEPTION". 104 */ 105 protected void placeInExceptionRouting(Throwable t, AsynchronousCall call, Object service) { 106 LOG.error("Error processing message: " + this.message, t); 107 final Throwable throwable; 108 if (t instanceof MessageProcessingException) { 109 throwable = t.getCause(); 110 } else { 111 throwable = t; 112 } 113 try { 114 try { 115 KSBServiceLocator.getExceptionRoutingService().placeInExceptionRouting(throwable, this.message, service); 116 } catch (Throwable t1) { 117 KSBServiceLocator.getExceptionRoutingService().placeInExceptionRoutingLastDitchEffort(throwable, this.message, service); 118 } 119 } catch (Throwable t2) { 120 LOG.error("An error was encountered when invoking exception handler for message. Attempting to change message status to EXCEPTION.", t2); 121 message.setQueueStatus(KSBConstants.ROUTE_QUEUE_EXCEPTION); 122 message.setQueueDate(new Timestamp(System.currentTimeMillis())); 123 try { 124 KSBServiceLocator.getMessageQueueService().save(message); 125 } catch (Throwable t3) { 126 LOG.fatal("Failed to flip status of message to EXCEPTION!!!", t3); 127 } 128 } 129 } 130 131 /** 132 * Invokes the AsynchronousCall represented on the methodCall on the service 133 * contained in the ServiceInfo object on the AsynchronousCall. 134 * 135 */ 136 protected Object invokeService(AsynchronousCall methodCall) throws Exception { 137 this.methodCall = methodCall; 138 ServiceConfiguration serviceConfiguration = methodCall.getServiceConfiguration(); 139 QName serviceName = serviceConfiguration.getServiceName(); 140 if (LOG.isDebugEnabled()) { 141 LOG.debug("Attempting to call service " + serviceName); 142 } 143 144 Object service = getService(serviceConfiguration); 145 if (service == null) { 146 throw new RiceRuntimeException("Failed to locate service endpoint for message: " + serviceConfiguration); 147 } 148 Method method = service.getClass().getMethod(methodCall.getMethodName(), methodCall.getParamTypes()); 149 return method.invoke(service, methodCall.getArguments()); 150 } 151 152 protected Object getService(ServiceConfiguration serviceConfiguration) { 153 Object service; 154 if (serviceConfiguration.isQueue()) { 155 service = getQueueService(serviceConfiguration); 156 } else { 157 service = getTopicService(serviceConfiguration); 158 } 159 return service; 160 } 161 162 /** 163 * Get the service as a topic. This means we want to contact every service 164 * that is a part of this topic. We've grabbed all the services that are a 165 * part of this topic and we want to make sure that we get everyone of them = 166 * that is we want to circumvent loadbalancing and therefore not ask for the 167 * service by it's name but the url to get the exact service we want. 168 * 169 * @param serviceInfo 170 * @return 171 */ 172 protected Object getTopicService(ServiceConfiguration serviceConfiguration) { 173 // get the service locally if we have it so we don't go through any 174 // remoting 175 ServiceBus serviceBus = KsbApiServiceLocator.getServiceBus(); 176 Endpoint endpoint = serviceBus.getConfiguredEndpoint(serviceConfiguration); 177 if (endpoint == null) { 178 return null; 179 } 180 return endpoint.getService(); 181 } 182 183 /** 184 * Because this is a queue we just need to grab one. 185 * 186 * @param serviceInfo 187 * @return 188 */ 189 protected Object getQueueService(ServiceConfiguration serviceConfiguration) { 190 ServiceBus serviceBus = KsbApiServiceLocator.getServiceBus(); 191 return serviceBus.getService(serviceConfiguration.getServiceName()); 192 } 193 194 /** 195 * Used in case the thread that dumped this work into the queue is waiting 196 * for the work to be done to continue processing. 197 * 198 * @param callback 199 */ 200 protected void notifyOnCallback(AsynchronousCall methodCall, Object callResult) { 201 AsynchronousCallback callback = methodCall.getCallback(); 202 notifyOnCallback(methodCall, callback, callResult); 203 } 204 205 protected void notifyGlobalCallbacks(AsynchronousCall methodCall, Object callResult) { 206 if (LOG.isDebugEnabled()) { 207 LOG.debug("Notifying global callbacks"); 208 } 209 for (AsynchronousCallback globalCallBack : GlobalCallbackRegistry.getCallbacks()) { 210 notifyOnCallback(methodCall, globalCallBack, callResult); 211 } 212 } 213 214 protected void notifyOnCallback(AsynchronousCall methodCall, AsynchronousCallback callback, Object callResult) { 215 if (callback != null) { 216 try { 217 synchronized (callback) { 218 if (LOG.isDebugEnabled()) { 219 LOG.debug("Notifying callback " + callback + " with callResult " + callResult); 220 } 221 callback.notifyAll(); 222 if (callResult instanceof Serializable || callResult == null) { 223 callback.callback((Serializable) callResult, methodCall); 224 } else { 225 // may never happen 226 LOG.warn("Attempted to call callback with non-serializable object."); 227 } 228 } 229 } catch (Throwable t) { 230 LOG.error("Caught throwable from callback object " + callback.getClass(), t); 231 } 232 } 233 } 234 235 public PersistedMessageBO getMessage() { 236 return this.message; 237 } 238 239 public void setMessage(PersistedMessageBO message) { 240 this.message = message; 241 } 242 243 public Object getService() { 244 return this.service; 245 } 246 247 public AsynchronousCall getMethodCall() { 248 return this.methodCall; 249 } 250 251 public void setMethodCall(AsynchronousCall methodCall) { 252 this.methodCall = methodCall; 253 } 254 255 public void setService(Object service) { 256 this.service = service; 257 } 258 }