001 /** 002 * Copyright 2005-2012 The Kuali Foundation 003 * 004 * Licensed under the Educational Community License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl2.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 package org.kuali.rice.ksb.messaging; 017 018 import java.io.Serializable; 019 import java.lang.reflect.Method; 020 import java.sql.Timestamp; 021 022 import javax.xml.namespace.QName; 023 024 import org.apache.log4j.Logger; 025 import org.kuali.rice.core.api.config.property.Config; 026 import org.kuali.rice.core.api.config.property.ConfigContext; 027 import org.kuali.rice.core.api.exception.RiceRuntimeException; 028 import org.kuali.rice.ksb.api.KsbApiServiceLocator; 029 import org.kuali.rice.ksb.api.bus.Endpoint; 030 import org.kuali.rice.ksb.api.bus.ServiceBus; 031 import org.kuali.rice.ksb.api.bus.ServiceConfiguration; 032 import org.kuali.rice.ksb.api.messaging.AsynchronousCall; 033 import org.kuali.rice.ksb.api.messaging.AsynchronousCallback; 034 import org.kuali.rice.ksb.service.KSBServiceLocator; 035 import org.kuali.rice.ksb.util.KSBConstants; 036 import org.springframework.transaction.TransactionStatus; 037 import org.springframework.transaction.support.TransactionCallback; 038 039 /** 040 * Handles invocation of a {@link PersistedMessageBO}. 041 * 042 * @author Kuali Rice Team (rice.collab@kuali.org) 043 */ 044 public class MessageServiceInvoker implements Runnable { 045 046 protected static final Logger LOG = Logger.getLogger(MessageServiceInvoker.class); 047 048 private PersistedMessageBO message; 049 private Object service; 050 private AsynchronousCall methodCall; 051 052 public MessageServiceInvoker(PersistedMessageBO message) { 053 this.message = message; 054 } 055 056 public void run() { 057 LOG.debug("calling service from persisted message " + getMessage().getRouteQueueId()); 058 if(ConfigContext.getCurrentContextConfig().getBooleanProperty(Config.MESSAGE_PERSISTENCE)) { 059 PersistedMessagePayload messageFromDB = KSBServiceLocator.getMessageQueueService().findByPersistedMessageByRouteQueueId(getMessage().getRouteQueueId()); 060 if(messageFromDB == null) { 061 // If the message is no longer found in the database we should skip this processing 062 return; 063 } 064 } 065 Object result = null; 066 try { 067 result = KSBServiceLocator.getTransactionTemplate().execute(new TransactionCallback<Object>() { 068 public Object doInTransaction(TransactionStatus status) { 069 AsynchronousCall methodCall = getMessage().getPayload().getMethodCall(); 070 Object result = null; 071 try { 072 result = invokeService(methodCall); 073 KSBServiceLocator.getMessageQueueService().delete(getMessage()); 074 } catch (Throwable t) { 075 LOG.warn("Caught throwable making async service call " + methodCall, t); 076 throw new MessageProcessingException(t); 077 } 078 return result; 079 } 080 }); 081 } catch (Throwable t) { 082 // if we are in synchronous mode, we can't put the message into exception routing, let's instead throw the error up to the calling code 083 // however, for the purposes of the unit tests, even when in synchronous mode, we want to call the exception routing service, so check a parameter for that as well 084 boolean allowSyncExceptionRouting = new Boolean(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.ALLOW_SYNC_EXCEPTION_ROUTING)); 085 if (!allowSyncExceptionRouting && KSBConstants.MESSAGING_SYNCHRONOUS.equals(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY))) { 086 if (t instanceof RuntimeException) { 087 throw (RuntimeException)t; 088 } 089 throw new RiceRuntimeException(t); 090 } else { 091 placeInExceptionRouting(t, getMethodCall(), getService()); 092 } 093 } finally { 094 try { 095 notifyOnCallback(methodCall, result); 096 } catch (Exception e) { 097 LOG.warn("Exception caught notifying callback", e); 098 } 099 try { 100 notifyGlobalCallbacks(methodCall, result); 101 } catch (Exception e) { 102 LOG.warn("Exception caught notifying callback", e); 103 } 104 } 105 } 106 107 /** 108 * Executed when an exception is encountered during message invocation. 109 * Attempts to call the ExceptionHandler for the message, if that fails it 110 * will attempt to set the status of the message in the queue to 111 * "EXCEPTION". 112 */ 113 protected void placeInExceptionRouting(Throwable t, AsynchronousCall call, Object service) { 114 LOG.error("Error processing message: " + this.message, t); 115 final Throwable throwable; 116 if (t instanceof MessageProcessingException) { 117 throwable = t.getCause(); 118 } else { 119 throwable = t; 120 } 121 try { 122 try { 123 KSBServiceLocator.getExceptionRoutingService().placeInExceptionRouting(throwable, this.message, service); 124 } catch (Throwable t1) { 125 KSBServiceLocator.getExceptionRoutingService().placeInExceptionRoutingLastDitchEffort(throwable, this.message, service); 126 } 127 } catch (Throwable t2) { 128 LOG.error("An error was encountered when invoking exception handler for message. Attempting to change message status to EXCEPTION.", t2); 129 message.setQueueStatus(KSBConstants.ROUTE_QUEUE_EXCEPTION); 130 message.setQueueDate(new Timestamp(System.currentTimeMillis())); 131 try { 132 KSBServiceLocator.getMessageQueueService().save(message); 133 } catch (Throwable t3) { 134 LOG.fatal("Failed to flip status of message to EXCEPTION!!!", t3); 135 } 136 } 137 } 138 139 /** 140 * Invokes the AsynchronousCall represented on the methodCall on the service 141 * contained in the ServiceInfo object on the AsynchronousCall. 142 * 143 */ 144 protected Object invokeService(AsynchronousCall methodCall) throws Exception { 145 this.methodCall = methodCall; 146 ServiceConfiguration serviceConfiguration = methodCall.getServiceConfiguration(); 147 QName serviceName = serviceConfiguration.getServiceName(); 148 if (LOG.isDebugEnabled()) { 149 LOG.debug("Attempting to call service " + serviceName); 150 } 151 152 Object service = getService(serviceConfiguration); 153 if (service == null) { 154 throw new RiceRuntimeException("Failed to locate service endpoint for message: " + serviceConfiguration); 155 } 156 Method method = service.getClass().getMethod(methodCall.getMethodName(), methodCall.getParamTypes()); 157 return method.invoke(service, methodCall.getArguments()); 158 } 159 160 protected Object getService(ServiceConfiguration serviceConfiguration) { 161 Object service; 162 if (serviceConfiguration.isQueue()) { 163 service = getQueueService(serviceConfiguration); 164 } else { 165 service = getTopicService(serviceConfiguration); 166 } 167 return service; 168 } 169 170 /** 171 * Get the service as a topic. This means we want to contact every service 172 * that is a part of this topic. We've grabbed all the services that are a 173 * part of this topic and we want to make sure that we get everyone of them = 174 * that is we want to circumvent loadbalancing and therefore not ask for the 175 * service by it's name but the url to get the exact service we want. 176 * 177 * @param serviceInfo 178 * @return 179 */ 180 protected Object getTopicService(ServiceConfiguration serviceConfiguration) { 181 // get the service locally if we have it so we don't go through any 182 // remoting 183 ServiceBus serviceBus = KsbApiServiceLocator.getServiceBus(); 184 Endpoint endpoint = serviceBus.getConfiguredEndpoint(serviceConfiguration); 185 if (endpoint == null) { 186 return null; 187 } 188 return endpoint.getService(); 189 } 190 191 /** 192 * Because this is a queue we just need to grab one. 193 * 194 * @param serviceInfo 195 * @return 196 */ 197 protected Object getQueueService(ServiceConfiguration serviceConfiguration) { 198 ServiceBus serviceBus = KsbApiServiceLocator.getServiceBus(); 199 return serviceBus.getService(serviceConfiguration.getServiceName(), serviceConfiguration.getApplicationId()); 200 } 201 202 /** 203 * Used in case the thread that dumped this work into the queue is waiting 204 * for the work to be done to continue processing. 205 * 206 * @param callback 207 */ 208 protected void notifyOnCallback(AsynchronousCall methodCall, Object callResult) { 209 AsynchronousCallback callback = methodCall.getCallback(); 210 notifyOnCallback(methodCall, callback, callResult); 211 } 212 213 protected void notifyGlobalCallbacks(AsynchronousCall methodCall, Object callResult) { 214 if (LOG.isDebugEnabled()) { 215 LOG.debug("Notifying global callbacks"); 216 } 217 for (AsynchronousCallback globalCallBack : GlobalCallbackRegistry.getCallbacks()) { 218 notifyOnCallback(methodCall, globalCallBack, callResult); 219 } 220 } 221 222 protected void notifyOnCallback(AsynchronousCall methodCall, AsynchronousCallback callback, Object callResult) { 223 if (callback != null) { 224 try { 225 synchronized (callback) { 226 if (LOG.isDebugEnabled()) { 227 LOG.debug("Notifying callback " + callback + " with callResult " + callResult); 228 } 229 callback.notifyAll(); 230 if (callResult instanceof Serializable || callResult == null) { 231 callback.callback((Serializable) callResult, methodCall); 232 } else { 233 // may never happen 234 LOG.warn("Attempted to call callback with non-serializable object."); 235 } 236 } 237 } catch (Throwable t) { 238 LOG.error("Caught throwable from callback object " + callback.getClass(), t); 239 } 240 } 241 } 242 243 public PersistedMessageBO getMessage() { 244 return this.message; 245 } 246 247 public void setMessage(PersistedMessageBO message) { 248 this.message = message; 249 } 250 251 public Object getService() { 252 return this.service; 253 } 254 255 public AsynchronousCall getMethodCall() { 256 return this.methodCall; 257 } 258 259 public void setMethodCall(AsynchronousCall methodCall) { 260 this.methodCall = methodCall; 261 } 262 263 public void setService(Object service) { 264 this.service = service; 265 } 266 }