View Javadoc

1   /**
2    * Copyright 2005-2012 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.ksb.messaging;
17  
18  import java.io.Serializable;
19  import java.lang.reflect.Method;
20  import java.sql.Timestamp;
21  
22  import javax.xml.namespace.QName;
23  
24  import org.apache.log4j.Logger;
25  import org.kuali.rice.core.api.config.property.ConfigContext;
26  import org.kuali.rice.core.api.exception.RiceRuntimeException;
27  import org.kuali.rice.ksb.api.KsbApiServiceLocator;
28  import org.kuali.rice.ksb.api.bus.Endpoint;
29  import org.kuali.rice.ksb.api.bus.ServiceBus;
30  import org.kuali.rice.ksb.api.bus.ServiceConfiguration;
31  import org.kuali.rice.ksb.api.messaging.AsynchronousCall;
32  import org.kuali.rice.ksb.api.messaging.AsynchronousCallback;
33  import org.kuali.rice.ksb.service.KSBServiceLocator;
34  import org.kuali.rice.ksb.util.KSBConstants;
35  import org.springframework.transaction.TransactionStatus;
36  import org.springframework.transaction.support.TransactionCallback;
37  
38  /**
39   * Handles invocation of a {@link PersistedMessageBO}.
40   * 
41   * @author Kuali Rice Team (rice.collab@kuali.org)
42   */
43  public class MessageServiceInvoker implements Runnable {
44  
45      protected static final Logger LOG = Logger.getLogger(MessageServiceInvoker.class);
46  
47      private PersistedMessageBO message;
48      private Object service;
49      private AsynchronousCall methodCall;
50  
51      public MessageServiceInvoker(PersistedMessageBO message) {
52          this.message = message;
53      }
54  
55      public void run() {
56          LOG.debug("calling service from persisted message " + getMessage().getRouteQueueId());
57          Object result = null;
58          try {
59              result = KSBServiceLocator.getTransactionTemplate().execute(new TransactionCallback<Object>() {
60                  public Object doInTransaction(TransactionStatus status) {
61                      AsynchronousCall methodCall = getMessage().getPayload().getMethodCall();
62                      Object result = null;
63                      try {
64                          result = invokeService(methodCall);
65                          KSBServiceLocator.getMessageQueueService().delete(getMessage());
66                      } catch (Throwable t) {
67                          LOG.warn("Caught throwable making async service call " + methodCall, t);
68                          throw new MessageProcessingException(t);
69                      }
70                      return result;
71                  }
72              });
73          } catch (Throwable t) {
74          	// if we are in synchronous mode, we can't put the message into exception routing, let's instead throw the error up to the calling code
75          	// however, for the purposes of the unit tests, even when in synchronous mode, we want to call the exception routing service, so check a parameter for that as well
76          	boolean allowSyncExceptionRouting = new Boolean(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.ALLOW_SYNC_EXCEPTION_ROUTING));
77       	 	if (!allowSyncExceptionRouting && KSBConstants.MESSAGING_SYNCHRONOUS.equals(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY))) {
78       	 		if (t instanceof RuntimeException) {
79       	 			throw (RuntimeException)t;
80       	 		}
81       	 		throw new RiceRuntimeException(t);
82       	 	} else {
83       	 		placeInExceptionRouting(t, getMethodCall(), getService());
84       	 	}
85          } finally {
86              try {
87                  notifyOnCallback(methodCall, result);
88              } catch (Exception e) {
89                  LOG.warn("Exception caught notifying callback", e);
90              }
91              try {
92                  notifyGlobalCallbacks(methodCall, result);
93              } catch (Exception e) {
94                  LOG.warn("Exception caught notifying callback", e);
95              }
96          }
97      }
98  
99      /**
100      * Executed when an exception is encountered during message invocation.
101      * Attempts to call the ExceptionHandler for the message, if that fails it
102      * will attempt to set the status of the message in the queue to
103      * "EXCEPTION".
104      */
105     protected void placeInExceptionRouting(Throwable t, AsynchronousCall call, Object service) {
106         LOG.error("Error processing message: " + this.message, t);
107         final Throwable throwable;
108         if (t instanceof MessageProcessingException) {
109             throwable = t.getCause();
110         } else {
111             throwable = t;
112         }
113         try {
114         	try {
115         		KSBServiceLocator.getExceptionRoutingService().placeInExceptionRouting(throwable, this.message, service);
116         	} catch (Throwable t1) {
117         		KSBServiceLocator.getExceptionRoutingService().placeInExceptionRoutingLastDitchEffort(throwable, this.message, service);
118         	}
119         } catch (Throwable t2) {
120             LOG.error("An error was encountered when invoking exception handler for message. Attempting to change message status to EXCEPTION.", t2);
121             message.setQueueStatus(KSBConstants.ROUTE_QUEUE_EXCEPTION);
122             message.setQueueDate(new Timestamp(System.currentTimeMillis()));
123             try {
124                 KSBServiceLocator.getMessageQueueService().save(message);
125             } catch (Throwable t3) {
126                 LOG.fatal("Failed to flip status of message to EXCEPTION!!!", t3);
127             }
128         }
129     }
130 
131     /**
132      * Invokes the AsynchronousCall represented on the methodCall on the service
133      * contained in the ServiceInfo object on the AsynchronousCall.
134      * 
135      */
136     protected Object invokeService(AsynchronousCall methodCall) throws Exception {
137         this.methodCall = methodCall;
138         ServiceConfiguration serviceConfiguration = methodCall.getServiceConfiguration();
139         QName serviceName = serviceConfiguration.getServiceName();
140         if (LOG.isDebugEnabled()) {
141             LOG.debug("Attempting to call service " + serviceName);
142         }
143 
144         Object service = getService(serviceConfiguration);
145         if (service == null) {
146         	throw new RiceRuntimeException("Failed to locate service endpoint for message: " + serviceConfiguration);
147         }
148         Method method = service.getClass().getMethod(methodCall.getMethodName(), methodCall.getParamTypes());
149         return method.invoke(service, methodCall.getArguments());
150     }
151 
152     protected Object getService(ServiceConfiguration serviceConfiguration) {
153         Object service;
154         if (serviceConfiguration.isQueue()) {
155             service = getQueueService(serviceConfiguration);
156         } else {
157             service = getTopicService(serviceConfiguration);
158         }
159         return service;
160     }
161 
162     /**
163      * Get the service as a topic. This means we want to contact every service
164      * that is a part of this topic. We've grabbed all the services that are a
165      * part of this topic and we want to make sure that we get everyone of them =
166      * that is we want to circumvent loadbalancing and therefore not ask for the
167      * service by it's name but the url to get the exact service we want.
168      * 
169      * @param serviceInfo
170      * @return
171      */
172     protected Object getTopicService(ServiceConfiguration serviceConfiguration) {
173         // get the service locally if we have it so we don't go through any
174         // remoting
175         ServiceBus serviceBus = KsbApiServiceLocator.getServiceBus();
176         Endpoint endpoint = serviceBus.getConfiguredEndpoint(serviceConfiguration);
177         if (endpoint == null) {
178         	return null;
179         }
180         return endpoint.getService();
181     }
182 
183     /**
184      * Because this is a queue we just need to grab one.
185      * 
186      * @param serviceInfo
187      * @return
188      */
189     protected Object getQueueService(ServiceConfiguration serviceConfiguration) {
190     	ServiceBus serviceBus = KsbApiServiceLocator.getServiceBus();
191     	return serviceBus.getService(serviceConfiguration.getServiceName());
192     }
193 
194     /**
195      * Used in case the thread that dumped this work into the queue is waiting
196      * for the work to be done to continue processing.
197      * 
198      * @param callback
199      */
200     protected void notifyOnCallback(AsynchronousCall methodCall, Object callResult) {
201         AsynchronousCallback callback = methodCall.getCallback();
202         notifyOnCallback(methodCall, callback, callResult);
203     }
204 
205     protected void notifyGlobalCallbacks(AsynchronousCall methodCall, Object callResult) {
206         if (LOG.isDebugEnabled()) {
207             LOG.debug("Notifying global callbacks");
208         }
209         for (AsynchronousCallback globalCallBack : GlobalCallbackRegistry.getCallbacks()) {
210             notifyOnCallback(methodCall, globalCallBack, callResult);
211         }
212     }
213 
214     protected void notifyOnCallback(AsynchronousCall methodCall, AsynchronousCallback callback, Object callResult) {
215         if (callback != null) {
216             try {
217                 synchronized (callback) {
218                     if (LOG.isDebugEnabled()) {
219                         LOG.debug("Notifying callback " + callback + " with callResult " + callResult);
220                     }
221                     callback.notifyAll();
222                     if (callResult instanceof Serializable || callResult == null) {
223                         callback.callback((Serializable) callResult, methodCall);
224                     } else {
225                         // may never happen
226                         LOG.warn("Attempted to call callback with non-serializable object.");
227                     }
228                 }
229             } catch (Throwable t) {
230                 LOG.error("Caught throwable from callback object " + callback.getClass(), t);
231             }
232         }
233     }
234 
235     public PersistedMessageBO getMessage() {
236         return this.message;
237     }
238 
239     public void setMessage(PersistedMessageBO message) {
240         this.message = message;
241     }
242 
243     public Object getService() {
244         return this.service;
245     }
246 
247     public AsynchronousCall getMethodCall() {
248         return this.methodCall;
249     }
250 
251     public void setMethodCall(AsynchronousCall methodCall) {
252         this.methodCall = methodCall;
253     }
254 
255     public void setService(Object service) {
256         this.service = service;
257     }
258 }