001 /**
002 * Copyright 2005-2012 The Kuali Foundation
003 *
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/ecl2.php
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package org.kuali.rice.ksb.messaging;
017
018 import java.io.Serializable;
019 import java.lang.reflect.Method;
020 import java.sql.Timestamp;
021
022 import javax.xml.namespace.QName;
023
024 import org.apache.log4j.Logger;
025 import org.kuali.rice.core.api.config.property.ConfigContext;
026 import org.kuali.rice.core.api.exception.RiceRuntimeException;
027 import org.kuali.rice.ksb.api.KsbApiServiceLocator;
028 import org.kuali.rice.ksb.api.bus.Endpoint;
029 import org.kuali.rice.ksb.api.bus.ServiceBus;
030 import org.kuali.rice.ksb.api.bus.ServiceConfiguration;
031 import org.kuali.rice.ksb.api.messaging.AsynchronousCall;
032 import org.kuali.rice.ksb.api.messaging.AsynchronousCallback;
033 import org.kuali.rice.ksb.service.KSBServiceLocator;
034 import org.kuali.rice.ksb.util.KSBConstants;
035 import org.springframework.transaction.TransactionStatus;
036 import org.springframework.transaction.support.TransactionCallback;
037
038 /**
039 * Handles invocation of a {@link PersistedMessageBO}.
040 *
041 * @author Kuali Rice Team (rice.collab@kuali.org)
042 */
043 public class MessageServiceInvoker implements Runnable {
044
045 protected static final Logger LOG = Logger.getLogger(MessageServiceInvoker.class);
046
047 private PersistedMessageBO message;
048 private Object service;
049 private AsynchronousCall methodCall;
050
051 public MessageServiceInvoker(PersistedMessageBO message) {
052 this.message = message;
053 }
054
055 public void run() {
056 LOG.debug("calling service from persisted message " + getMessage().getRouteQueueId());
057 Object result = null;
058 try {
059 result = KSBServiceLocator.getTransactionTemplate().execute(new TransactionCallback<Object>() {
060 public Object doInTransaction(TransactionStatus status) {
061 AsynchronousCall methodCall = getMessage().getPayload().getMethodCall();
062 Object result = null;
063 try {
064 result = invokeService(methodCall);
065 KSBServiceLocator.getMessageQueueService().delete(getMessage());
066 } catch (Throwable t) {
067 LOG.warn("Caught throwable making async service call " + methodCall, t);
068 throw new MessageProcessingException(t);
069 }
070 return result;
071 }
072 });
073 } catch (Throwable t) {
074 // if we are in synchronous mode, we can't put the message into exception routing, let's instead throw the error up to the calling code
075 // however, for the purposes of the unit tests, even when in synchronous mode, we want to call the exception routing service, so check a parameter for that as well
076 boolean allowSyncExceptionRouting = new Boolean(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.ALLOW_SYNC_EXCEPTION_ROUTING));
077 if (!allowSyncExceptionRouting && KSBConstants.MESSAGING_SYNCHRONOUS.equals(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY))) {
078 if (t instanceof RuntimeException) {
079 throw (RuntimeException)t;
080 }
081 throw new RiceRuntimeException(t);
082 } else {
083 placeInExceptionRouting(t, getMethodCall(), getService());
084 }
085 } finally {
086 try {
087 notifyOnCallback(methodCall, result);
088 } catch (Exception e) {
089 LOG.warn("Exception caught notifying callback", e);
090 }
091 try {
092 notifyGlobalCallbacks(methodCall, result);
093 } catch (Exception e) {
094 LOG.warn("Exception caught notifying callback", e);
095 }
096 }
097 }
098
099 /**
100 * Executed when an exception is encountered during message invocation.
101 * Attempts to call the ExceptionHandler for the message, if that fails it
102 * will attempt to set the status of the message in the queue to
103 * "EXCEPTION".
104 */
105 protected void placeInExceptionRouting(Throwable t, AsynchronousCall call, Object service) {
106 LOG.error("Error processing message: " + this.message, t);
107 final Throwable throwable;
108 if (t instanceof MessageProcessingException) {
109 throwable = t.getCause();
110 } else {
111 throwable = t;
112 }
113 try {
114 try {
115 KSBServiceLocator.getExceptionRoutingService().placeInExceptionRouting(throwable, this.message, service);
116 } catch (Throwable t1) {
117 KSBServiceLocator.getExceptionRoutingService().placeInExceptionRoutingLastDitchEffort(throwable, this.message, service);
118 }
119 } catch (Throwable t2) {
120 LOG.error("An error was encountered when invoking exception handler for message. Attempting to change message status to EXCEPTION.", t2);
121 message.setQueueStatus(KSBConstants.ROUTE_QUEUE_EXCEPTION);
122 message.setQueueDate(new Timestamp(System.currentTimeMillis()));
123 try {
124 KSBServiceLocator.getMessageQueueService().save(message);
125 } catch (Throwable t3) {
126 LOG.fatal("Failed to flip status of message to EXCEPTION!!!", t3);
127 }
128 }
129 }
130
131 /**
132 * Invokes the AsynchronousCall represented on the methodCall on the service
133 * contained in the ServiceInfo object on the AsynchronousCall.
134 *
135 */
136 protected Object invokeService(AsynchronousCall methodCall) throws Exception {
137 this.methodCall = methodCall;
138 ServiceConfiguration serviceConfiguration = methodCall.getServiceConfiguration();
139 QName serviceName = serviceConfiguration.getServiceName();
140 if (LOG.isDebugEnabled()) {
141 LOG.debug("Attempting to call service " + serviceName);
142 }
143
144 Object service = getService(serviceConfiguration);
145 if (service == null) {
146 throw new RiceRuntimeException("Failed to locate service endpoint for message: " + serviceConfiguration);
147 }
148 Method method = service.getClass().getMethod(methodCall.getMethodName(), methodCall.getParamTypes());
149 return method.invoke(service, methodCall.getArguments());
150 }
151
152 protected Object getService(ServiceConfiguration serviceConfiguration) {
153 Object service;
154 if (serviceConfiguration.isQueue()) {
155 service = getQueueService(serviceConfiguration);
156 } else {
157 service = getTopicService(serviceConfiguration);
158 }
159 return service;
160 }
161
162 /**
163 * Get the service as a topic. This means we want to contact every service
164 * that is a part of this topic. We've grabbed all the services that are a
165 * part of this topic and we want to make sure that we get everyone of them =
166 * that is we want to circumvent loadbalancing and therefore not ask for the
167 * service by it's name but the url to get the exact service we want.
168 *
169 * @param serviceInfo
170 * @return
171 */
172 protected Object getTopicService(ServiceConfiguration serviceConfiguration) {
173 // get the service locally if we have it so we don't go through any
174 // remoting
175 ServiceBus serviceBus = KsbApiServiceLocator.getServiceBus();
176 Endpoint endpoint = serviceBus.getConfiguredEndpoint(serviceConfiguration);
177 if (endpoint == null) {
178 return null;
179 }
180 return endpoint.getService();
181 }
182
183 /**
184 * Because this is a queue we just need to grab one.
185 *
186 * @param serviceInfo
187 * @return
188 */
189 protected Object getQueueService(ServiceConfiguration serviceConfiguration) {
190 ServiceBus serviceBus = KsbApiServiceLocator.getServiceBus();
191 return serviceBus.getService(serviceConfiguration.getServiceName());
192 }
193
194 /**
195 * Used in case the thread that dumped this work into the queue is waiting
196 * for the work to be done to continue processing.
197 *
198 * @param callback
199 */
200 protected void notifyOnCallback(AsynchronousCall methodCall, Object callResult) {
201 AsynchronousCallback callback = methodCall.getCallback();
202 notifyOnCallback(methodCall, callback, callResult);
203 }
204
205 protected void notifyGlobalCallbacks(AsynchronousCall methodCall, Object callResult) {
206 if (LOG.isDebugEnabled()) {
207 LOG.debug("Notifying global callbacks");
208 }
209 for (AsynchronousCallback globalCallBack : GlobalCallbackRegistry.getCallbacks()) {
210 notifyOnCallback(methodCall, globalCallBack, callResult);
211 }
212 }
213
214 protected void notifyOnCallback(AsynchronousCall methodCall, AsynchronousCallback callback, Object callResult) {
215 if (callback != null) {
216 try {
217 synchronized (callback) {
218 if (LOG.isDebugEnabled()) {
219 LOG.debug("Notifying callback " + callback + " with callResult " + callResult);
220 }
221 callback.notifyAll();
222 if (callResult instanceof Serializable || callResult == null) {
223 callback.callback((Serializable) callResult, methodCall);
224 } else {
225 // may never happen
226 LOG.warn("Attempted to call callback with non-serializable object.");
227 }
228 }
229 } catch (Throwable t) {
230 LOG.error("Caught throwable from callback object " + callback.getClass(), t);
231 }
232 }
233 }
234
235 public PersistedMessageBO getMessage() {
236 return this.message;
237 }
238
239 public void setMessage(PersistedMessageBO message) {
240 this.message = message;
241 }
242
243 public Object getService() {
244 return this.service;
245 }
246
247 public AsynchronousCall getMethodCall() {
248 return this.methodCall;
249 }
250
251 public void setMethodCall(AsynchronousCall methodCall) {
252 this.methodCall = methodCall;
253 }
254
255 public void setService(Object service) {
256 this.service = service;
257 }
258 }