View Javadoc
1   /**
2    * Copyright 2005-2016 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.ksb.messaging;
17  
18  import java.io.Serializable;
19  import java.lang.reflect.Method;
20  import java.sql.Timestamp;
21  
22  import javax.xml.namespace.QName;
23  
24  import org.apache.log4j.Logger;
25  import org.kuali.rice.core.api.config.property.Config;
26  import org.kuali.rice.core.api.config.property.ConfigContext;
27  import org.kuali.rice.core.api.exception.RiceRuntimeException;
28  import org.kuali.rice.ksb.api.KsbApiServiceLocator;
29  import org.kuali.rice.ksb.api.bus.Endpoint;
30  import org.kuali.rice.ksb.api.bus.ServiceBus;
31  import org.kuali.rice.ksb.api.bus.ServiceConfiguration;
32  import org.kuali.rice.ksb.api.messaging.AsynchronousCall;
33  import org.kuali.rice.ksb.api.messaging.AsynchronousCallback;
34  import org.kuali.rice.ksb.service.KSBServiceLocator;
35  import org.kuali.rice.ksb.util.KSBConstants;
36  import org.springframework.transaction.TransactionStatus;
37  import org.springframework.transaction.support.TransactionCallback;
38  
39  /**
40   * Handles invocation of a {@link PersistedMessageBO}.
41   * 
42   * @author Kuali Rice Team (rice.collab@kuali.org)
43   */
44  public class MessageServiceInvoker implements Runnable {
45  
46      protected static final Logger LOG = Logger.getLogger(MessageServiceInvoker.class);
47  
48      private PersistedMessageBO message;
49      private Object service;
50      private AsynchronousCall methodCall;
51  
52      public MessageServiceInvoker(PersistedMessageBO message) {
53          this.message = message;
54      }
55  
56      public void run() {
57          LOG.debug("calling service from persisted message " + getMessage().getRouteQueueId());
58          if(ConfigContext.getCurrentContextConfig().getBooleanProperty(Config.MESSAGE_PERSISTENCE)) {
59              PersistedMessagePayload messageFromDB = KSBServiceLocator.getMessageQueueService().findByPersistedMessageByRouteQueueId(getMessage().getRouteQueueId());
60              if(messageFromDB == null) {
61                  // If the message is no longer found in the database we should skip this processing
62                  return;
63              }
64          }
65          Object result = null;
66          try {
67              result = KSBServiceLocator.getTransactionTemplate().execute(new TransactionCallback<Object>() {
68                  public Object doInTransaction(TransactionStatus status) {
69                      AsynchronousCall methodCall = getMessage().getPayload().getMethodCall();
70                      Object result = null;
71                      try {
72                          result = invokeService(methodCall);
73                          KSBServiceLocator.getMessageQueueService().delete(getMessage());
74                      } catch (Throwable t) {
75                          LOG.warn("Caught throwable making async service call " + methodCall, t);
76                          throw new MessageProcessingException(t);
77                      }
78                      return result;
79                  }
80              });
81          } catch (Throwable t) {
82          	// if we are in synchronous mode, we can't put the message into exception routing, let's instead throw the error up to the calling code
83          	// however, for the purposes of the unit tests, even when in synchronous mode, we want to call the exception routing service, so check a parameter for that as well
84          	boolean allowSyncExceptionRouting = new Boolean(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.ALLOW_SYNC_EXCEPTION_ROUTING));
85       	 	if (!allowSyncExceptionRouting && KSBConstants.MESSAGING_SYNCHRONOUS.equals(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY))) {
86       	 		if (t instanceof RuntimeException) {
87       	 			throw (RuntimeException)t;
88       	 		}
89       	 		throw new RiceRuntimeException(t);
90       	 	} else {
91       	 		placeInExceptionRouting(t, getMethodCall(), getService());
92       	 	}
93          } finally {
94              try {
95                  notifyOnCallback(methodCall, result);
96              } catch (Exception e) {
97                  LOG.warn("Exception caught notifying callback", e);
98              }
99              try {
100                 notifyGlobalCallbacks(methodCall, result);
101             } catch (Exception e) {
102                 LOG.warn("Exception caught notifying callback", e);
103             }
104         }
105     }
106 
107     /**
108      * Executed when an exception is encountered during message invocation.
109      * Attempts to call the ExceptionHandler for the message, if that fails it
110      * will attempt to set the status of the message in the queue to
111      * "EXCEPTION".
112      */
113     protected void placeInExceptionRouting(Throwable t, AsynchronousCall call, Object service) {
114         LOG.error("Error processing message: " + this.message, t);
115         final Throwable throwable;
116         if (t instanceof MessageProcessingException) {
117             throwable = t.getCause();
118         } else {
119             throwable = t;
120         }
121         try {
122         	try {
123         		KSBServiceLocator.getExceptionRoutingService().placeInExceptionRouting(throwable, this.message, service);
124         	} catch (Throwable t1) {
125         		KSBServiceLocator.getExceptionRoutingService().placeInExceptionRoutingLastDitchEffort(throwable, this.message, service);
126         	}
127         } catch (Throwable t2) {
128             LOG.error("An error was encountered when invoking exception handler for message. Attempting to change message status to EXCEPTION.", t2);
129             message.setQueueStatus(KSBConstants.ROUTE_QUEUE_EXCEPTION);
130             message.setQueueDate(new Timestamp(System.currentTimeMillis()));
131             try {
132                 KSBServiceLocator.getMessageQueueService().save(message);
133             } catch (Throwable t3) {
134                 LOG.fatal("Failed to flip status of message to EXCEPTION!!!", t3);
135             }
136         }
137     }
138 
139     /**
140      * Invokes the AsynchronousCall represented on the methodCall on the service
141      * contained in the ServiceInfo object on the AsynchronousCall.
142      * 
143      */
144     protected Object invokeService(AsynchronousCall methodCall) throws Exception {
145         this.methodCall = methodCall;
146         ServiceConfiguration serviceConfiguration = methodCall.getServiceConfiguration();
147         QName serviceName = serviceConfiguration.getServiceName();
148         if (LOG.isDebugEnabled()) {
149             LOG.debug("Attempting to call service " + serviceName);
150         }
151 
152         Object service = getService(serviceConfiguration);
153         if (service == null) {
154         	throw new RiceRuntimeException("Failed to locate service endpoint for message: " + serviceConfiguration);
155         }
156         Method method = service.getClass().getMethod(methodCall.getMethodName(), methodCall.getParamTypes());
157         return method.invoke(service, methodCall.getArguments());
158     }
159 
160     protected Object getService(ServiceConfiguration serviceConfiguration) {
161         Object service;
162         if (serviceConfiguration.isQueue()) {
163             service = getQueueService(serviceConfiguration);
164         } else {
165             service = getTopicService(serviceConfiguration);
166         }
167         return service;
168     }
169 
170     /**
171      * Get the service as a topic. This means we want to contact every service
172      * that is a part of this topic. We've grabbed all the services that are a
173      * part of this topic and we want to make sure that we get everyone of them =
174      * that is we want to circumvent loadbalancing and therefore not ask for the
175      * service by it's name but the url to get the exact service we want.
176      * 
177      * @param serviceInfo
178      * @return
179      */
180     protected Object getTopicService(ServiceConfiguration serviceConfiguration) {
181         // get the service locally if we have it so we don't go through any
182         // remoting
183         ServiceBus serviceBus = KsbApiServiceLocator.getServiceBus();
184         Endpoint endpoint = serviceBus.getConfiguredEndpoint(serviceConfiguration);
185         if (endpoint == null) {
186         	return null;
187         }
188         return endpoint.getService();
189     }
190 
191     /**
192      * Because this is a queue we just need to grab one.
193      * 
194      * @param serviceInfo
195      * @return
196      */
197     protected Object getQueueService(ServiceConfiguration serviceConfiguration) {
198     	ServiceBus serviceBus = KsbApiServiceLocator.getServiceBus();
199     	return serviceBus.getService(serviceConfiguration.getServiceName(), serviceConfiguration.getApplicationId());
200     }
201 
202     /**
203      * Used in case the thread that dumped this work into the queue is waiting
204      * for the work to be done to continue processing.
205      * 
206      * @param callback
207      */
208     protected void notifyOnCallback(AsynchronousCall methodCall, Object callResult) {
209         AsynchronousCallback callback = methodCall.getCallback();
210         notifyOnCallback(methodCall, callback, callResult);
211     }
212 
213     protected void notifyGlobalCallbacks(AsynchronousCall methodCall, Object callResult) {
214         if (LOG.isDebugEnabled()) {
215             LOG.debug("Notifying global callbacks");
216         }
217         for (AsynchronousCallback globalCallBack : GlobalCallbackRegistry.getCallbacks()) {
218             notifyOnCallback(methodCall, globalCallBack, callResult);
219         }
220     }
221 
222     protected void notifyOnCallback(AsynchronousCall methodCall, AsynchronousCallback callback, Object callResult) {
223         if (callback != null) {
224             try {
225                 synchronized (callback) {
226                     if (LOG.isDebugEnabled()) {
227                         LOG.debug("Notifying callback " + callback + " with callResult " + callResult);
228                     }
229                     callback.notifyAll();
230                     if (callResult instanceof Serializable || callResult == null) {
231                         callback.callback((Serializable) callResult, methodCall);
232                     } else {
233                         // may never happen
234                         LOG.warn("Attempted to call callback with non-serializable object.");
235                     }
236                 }
237             } catch (Throwable t) {
238                 LOG.error("Caught throwable from callback object " + callback.getClass(), t);
239             }
240         }
241     }
242 
243     public PersistedMessageBO getMessage() {
244         return this.message;
245     }
246 
247     public void setMessage(PersistedMessageBO message) {
248         this.message = message;
249     }
250 
251     public Object getService() {
252         return this.service;
253     }
254 
255     public AsynchronousCall getMethodCall() {
256         return this.methodCall;
257     }
258 
259     public void setMethodCall(AsynchronousCall methodCall) {
260         this.methodCall = methodCall;
261     }
262 
263     public void setService(Object service) {
264         this.service = service;
265     }
266 }