001 /**
002 * Copyright 2005-2012 The Kuali Foundation
003 *
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/ecl2.php
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package org.kuali.rice.ksb.messaging;
017
018 import java.io.Serializable;
019 import java.lang.reflect.Method;
020 import java.sql.Timestamp;
021
022 import javax.xml.namespace.QName;
023
024 import org.apache.log4j.Logger;
025 import org.kuali.rice.core.api.config.property.Config;
026 import org.kuali.rice.core.api.config.property.ConfigContext;
027 import org.kuali.rice.core.api.exception.RiceRuntimeException;
028 import org.kuali.rice.ksb.api.KsbApiServiceLocator;
029 import org.kuali.rice.ksb.api.bus.Endpoint;
030 import org.kuali.rice.ksb.api.bus.ServiceBus;
031 import org.kuali.rice.ksb.api.bus.ServiceConfiguration;
032 import org.kuali.rice.ksb.api.messaging.AsynchronousCall;
033 import org.kuali.rice.ksb.api.messaging.AsynchronousCallback;
034 import org.kuali.rice.ksb.service.KSBServiceLocator;
035 import org.kuali.rice.ksb.util.KSBConstants;
036 import org.springframework.transaction.TransactionStatus;
037 import org.springframework.transaction.support.TransactionCallback;
038
039 /**
040 * Handles invocation of a {@link PersistedMessageBO}.
041 *
042 * @author Kuali Rice Team (rice.collab@kuali.org)
043 */
044 public class MessageServiceInvoker implements Runnable {
045
046 protected static final Logger LOG = Logger.getLogger(MessageServiceInvoker.class);
047
048 private PersistedMessageBO message;
049 private Object service;
050 private AsynchronousCall methodCall;
051
052 public MessageServiceInvoker(PersistedMessageBO message) {
053 this.message = message;
054 }
055
056 public void run() {
057 LOG.debug("calling service from persisted message " + getMessage().getRouteQueueId());
058 if(ConfigContext.getCurrentContextConfig().getBooleanProperty(Config.MESSAGE_PERSISTENCE)) {
059 PersistedMessagePayload messageFromDB = KSBServiceLocator.getMessageQueueService().findByPersistedMessageByRouteQueueId(getMessage().getRouteQueueId());
060 if(messageFromDB == null) {
061 // If the message is no longer found in the database we should skip this processing
062 return;
063 }
064 }
065 Object result = null;
066 try {
067 result = KSBServiceLocator.getTransactionTemplate().execute(new TransactionCallback<Object>() {
068 public Object doInTransaction(TransactionStatus status) {
069 AsynchronousCall methodCall = getMessage().getPayload().getMethodCall();
070 Object result = null;
071 try {
072 result = invokeService(methodCall);
073 KSBServiceLocator.getMessageQueueService().delete(getMessage());
074 } catch (Throwable t) {
075 LOG.warn("Caught throwable making async service call " + methodCall, t);
076 throw new MessageProcessingException(t);
077 }
078 return result;
079 }
080 });
081 } catch (Throwable t) {
082 // if we are in synchronous mode, we can't put the message into exception routing, let's instead throw the error up to the calling code
083 // however, for the purposes of the unit tests, even when in synchronous mode, we want to call the exception routing service, so check a parameter for that as well
084 boolean allowSyncExceptionRouting = new Boolean(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.ALLOW_SYNC_EXCEPTION_ROUTING));
085 if (!allowSyncExceptionRouting && KSBConstants.MESSAGING_SYNCHRONOUS.equals(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY))) {
086 if (t instanceof RuntimeException) {
087 throw (RuntimeException)t;
088 }
089 throw new RiceRuntimeException(t);
090 } else {
091 placeInExceptionRouting(t, getMethodCall(), getService());
092 }
093 } finally {
094 try {
095 notifyOnCallback(methodCall, result);
096 } catch (Exception e) {
097 LOG.warn("Exception caught notifying callback", e);
098 }
099 try {
100 notifyGlobalCallbacks(methodCall, result);
101 } catch (Exception e) {
102 LOG.warn("Exception caught notifying callback", e);
103 }
104 }
105 }
106
107 /**
108 * Executed when an exception is encountered during message invocation.
109 * Attempts to call the ExceptionHandler for the message, if that fails it
110 * will attempt to set the status of the message in the queue to
111 * "EXCEPTION".
112 */
113 protected void placeInExceptionRouting(Throwable t, AsynchronousCall call, Object service) {
114 LOG.error("Error processing message: " + this.message, t);
115 final Throwable throwable;
116 if (t instanceof MessageProcessingException) {
117 throwable = t.getCause();
118 } else {
119 throwable = t;
120 }
121 try {
122 try {
123 KSBServiceLocator.getExceptionRoutingService().placeInExceptionRouting(throwable, this.message, service);
124 } catch (Throwable t1) {
125 KSBServiceLocator.getExceptionRoutingService().placeInExceptionRoutingLastDitchEffort(throwable, this.message, service);
126 }
127 } catch (Throwable t2) {
128 LOG.error("An error was encountered when invoking exception handler for message. Attempting to change message status to EXCEPTION.", t2);
129 message.setQueueStatus(KSBConstants.ROUTE_QUEUE_EXCEPTION);
130 message.setQueueDate(new Timestamp(System.currentTimeMillis()));
131 try {
132 KSBServiceLocator.getMessageQueueService().save(message);
133 } catch (Throwable t3) {
134 LOG.fatal("Failed to flip status of message to EXCEPTION!!!", t3);
135 }
136 }
137 }
138
139 /**
140 * Invokes the AsynchronousCall represented on the methodCall on the service
141 * contained in the ServiceInfo object on the AsynchronousCall.
142 *
143 */
144 protected Object invokeService(AsynchronousCall methodCall) throws Exception {
145 this.methodCall = methodCall;
146 ServiceConfiguration serviceConfiguration = methodCall.getServiceConfiguration();
147 QName serviceName = serviceConfiguration.getServiceName();
148 if (LOG.isDebugEnabled()) {
149 LOG.debug("Attempting to call service " + serviceName);
150 }
151
152 Object service = getService(serviceConfiguration);
153 if (service == null) {
154 throw new RiceRuntimeException("Failed to locate service endpoint for message: " + serviceConfiguration);
155 }
156 Method method = service.getClass().getMethod(methodCall.getMethodName(), methodCall.getParamTypes());
157 return method.invoke(service, methodCall.getArguments());
158 }
159
160 protected Object getService(ServiceConfiguration serviceConfiguration) {
161 Object service;
162 if (serviceConfiguration.isQueue()) {
163 service = getQueueService(serviceConfiguration);
164 } else {
165 service = getTopicService(serviceConfiguration);
166 }
167 return service;
168 }
169
170 /**
171 * Get the service as a topic. This means we want to contact every service
172 * that is a part of this topic. We've grabbed all the services that are a
173 * part of this topic and we want to make sure that we get everyone of them =
174 * that is we want to circumvent loadbalancing and therefore not ask for the
175 * service by it's name but the url to get the exact service we want.
176 *
177 * @param serviceInfo
178 * @return
179 */
180 protected Object getTopicService(ServiceConfiguration serviceConfiguration) {
181 // get the service locally if we have it so we don't go through any
182 // remoting
183 ServiceBus serviceBus = KsbApiServiceLocator.getServiceBus();
184 Endpoint endpoint = serviceBus.getConfiguredEndpoint(serviceConfiguration);
185 if (endpoint == null) {
186 return null;
187 }
188 return endpoint.getService();
189 }
190
191 /**
192 * Because this is a queue we just need to grab one.
193 *
194 * @param serviceInfo
195 * @return
196 */
197 protected Object getQueueService(ServiceConfiguration serviceConfiguration) {
198 ServiceBus serviceBus = KsbApiServiceLocator.getServiceBus();
199 return serviceBus.getService(serviceConfiguration.getServiceName(), serviceConfiguration.getApplicationId());
200 }
201
202 /**
203 * Used in case the thread that dumped this work into the queue is waiting
204 * for the work to be done to continue processing.
205 *
206 * @param callback
207 */
208 protected void notifyOnCallback(AsynchronousCall methodCall, Object callResult) {
209 AsynchronousCallback callback = methodCall.getCallback();
210 notifyOnCallback(methodCall, callback, callResult);
211 }
212
213 protected void notifyGlobalCallbacks(AsynchronousCall methodCall, Object callResult) {
214 if (LOG.isDebugEnabled()) {
215 LOG.debug("Notifying global callbacks");
216 }
217 for (AsynchronousCallback globalCallBack : GlobalCallbackRegistry.getCallbacks()) {
218 notifyOnCallback(methodCall, globalCallBack, callResult);
219 }
220 }
221
222 protected void notifyOnCallback(AsynchronousCall methodCall, AsynchronousCallback callback, Object callResult) {
223 if (callback != null) {
224 try {
225 synchronized (callback) {
226 if (LOG.isDebugEnabled()) {
227 LOG.debug("Notifying callback " + callback + " with callResult " + callResult);
228 }
229 callback.notifyAll();
230 if (callResult instanceof Serializable || callResult == null) {
231 callback.callback((Serializable) callResult, methodCall);
232 } else {
233 // may never happen
234 LOG.warn("Attempted to call callback with non-serializable object.");
235 }
236 }
237 } catch (Throwable t) {
238 LOG.error("Caught throwable from callback object " + callback.getClass(), t);
239 }
240 }
241 }
242
243 public PersistedMessageBO getMessage() {
244 return this.message;
245 }
246
247 public void setMessage(PersistedMessageBO message) {
248 this.message = message;
249 }
250
251 public Object getService() {
252 return this.service;
253 }
254
255 public AsynchronousCall getMethodCall() {
256 return this.methodCall;
257 }
258
259 public void setMethodCall(AsynchronousCall methodCall) {
260 this.methodCall = methodCall;
261 }
262
263 public void setService(Object service) {
264 this.service = service;
265 }
266 }