001    /**
002     * Copyright 2005-2012 The Kuali Foundation
003     *
004     * Licensed under the Educational Community License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.opensource.org/licenses/ecl2.php
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package org.kuali.rice.ksb.messaging;
017    
018    import java.io.Serializable;
019    import java.lang.reflect.Method;
020    import java.sql.Timestamp;
021    
022    import javax.xml.namespace.QName;
023    
024    import org.apache.log4j.Logger;
025    import org.kuali.rice.core.api.config.property.ConfigContext;
026    import org.kuali.rice.core.api.exception.RiceRuntimeException;
027    import org.kuali.rice.ksb.api.KsbApiServiceLocator;
028    import org.kuali.rice.ksb.api.bus.Endpoint;
029    import org.kuali.rice.ksb.api.bus.ServiceBus;
030    import org.kuali.rice.ksb.api.bus.ServiceConfiguration;
031    import org.kuali.rice.ksb.api.messaging.AsynchronousCall;
032    import org.kuali.rice.ksb.api.messaging.AsynchronousCallback;
033    import org.kuali.rice.ksb.service.KSBServiceLocator;
034    import org.kuali.rice.ksb.util.KSBConstants;
035    import org.springframework.transaction.TransactionStatus;
036    import org.springframework.transaction.support.TransactionCallback;
037    
038    /**
039     * Handles invocation of a {@link PersistedMessageBO}.
040     * 
041     * @author Kuali Rice Team (rice.collab@kuali.org)
042     */
043    public class MessageServiceInvoker implements Runnable {
044    
045        protected static final Logger LOG = Logger.getLogger(MessageServiceInvoker.class);
046    
047        private PersistedMessageBO message;
048        private Object service;
049        private AsynchronousCall methodCall;
050    
051        public MessageServiceInvoker(PersistedMessageBO message) {
052            this.message = message;
053        }
054    
055        public void run() {
056            LOG.debug("calling service from persisted message " + getMessage().getRouteQueueId());
057            Object result = null;
058            try {
059                result = KSBServiceLocator.getTransactionTemplate().execute(new TransactionCallback<Object>() {
060                    public Object doInTransaction(TransactionStatus status) {
061                        AsynchronousCall methodCall = getMessage().getPayload().getMethodCall();
062                        Object result = null;
063                        try {
064                            result = invokeService(methodCall);
065                            KSBServiceLocator.getMessageQueueService().delete(getMessage());
066                        } catch (Throwable t) {
067                            LOG.warn("Caught throwable making async service call " + methodCall, t);
068                            throw new MessageProcessingException(t);
069                        }
070                        return result;
071                    }
072                });
073            } catch (Throwable t) {
074                    // if we are in synchronous mode, we can't put the message into exception routing, let's instead throw the error up to the calling code
075                    // however, for the purposes of the unit tests, even when in synchronous mode, we want to call the exception routing service, so check a parameter for that as well
076                    boolean allowSyncExceptionRouting = new Boolean(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.ALLOW_SYNC_EXCEPTION_ROUTING));
077                    if (!allowSyncExceptionRouting && KSBConstants.MESSAGING_SYNCHRONOUS.equals(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY))) {
078                            if (t instanceof RuntimeException) {
079                                    throw (RuntimeException)t;
080                            }
081                            throw new RiceRuntimeException(t);
082                    } else {
083                            placeInExceptionRouting(t, getMethodCall(), getService());
084                    }
085            } finally {
086                try {
087                    notifyOnCallback(methodCall, result);
088                } catch (Exception e) {
089                    LOG.warn("Exception caught notifying callback", e);
090                }
091                try {
092                    notifyGlobalCallbacks(methodCall, result);
093                } catch (Exception e) {
094                    LOG.warn("Exception caught notifying callback", e);
095                }
096            }
097        }
098    
099        /**
100         * Executed when an exception is encountered during message invocation.
101         * Attempts to call the ExceptionHandler for the message, if that fails it
102         * will attempt to set the status of the message in the queue to
103         * "EXCEPTION".
104         */
105        protected void placeInExceptionRouting(Throwable t, AsynchronousCall call, Object service) {
106            LOG.error("Error processing message: " + this.message, t);
107            final Throwable throwable;
108            if (t instanceof MessageProcessingException) {
109                throwable = t.getCause();
110            } else {
111                throwable = t;
112            }
113            try {
114                    try {
115                            KSBServiceLocator.getExceptionRoutingService().placeInExceptionRouting(throwable, this.message, service);
116                    } catch (Throwable t1) {
117                            KSBServiceLocator.getExceptionRoutingService().placeInExceptionRoutingLastDitchEffort(throwable, this.message, service);
118                    }
119            } catch (Throwable t2) {
120                LOG.error("An error was encountered when invoking exception handler for message. Attempting to change message status to EXCEPTION.", t2);
121                message.setQueueStatus(KSBConstants.ROUTE_QUEUE_EXCEPTION);
122                message.setQueueDate(new Timestamp(System.currentTimeMillis()));
123                try {
124                    KSBServiceLocator.getMessageQueueService().save(message);
125                } catch (Throwable t3) {
126                    LOG.fatal("Failed to flip status of message to EXCEPTION!!!", t3);
127                }
128            }
129        }
130    
131        /**
132         * Invokes the AsynchronousCall represented on the methodCall on the service
133         * contained in the ServiceInfo object on the AsynchronousCall.
134         * 
135         */
136        protected Object invokeService(AsynchronousCall methodCall) throws Exception {
137            this.methodCall = methodCall;
138            ServiceConfiguration serviceConfiguration = methodCall.getServiceConfiguration();
139            QName serviceName = serviceConfiguration.getServiceName();
140            if (LOG.isDebugEnabled()) {
141                LOG.debug("Attempting to call service " + serviceName);
142            }
143    
144            Object service = getService(serviceConfiguration);
145            if (service == null) {
146                    throw new RiceRuntimeException("Failed to locate service endpoint for message: " + serviceConfiguration);
147            }
148            Method method = service.getClass().getMethod(methodCall.getMethodName(), methodCall.getParamTypes());
149            return method.invoke(service, methodCall.getArguments());
150        }
151    
152        protected Object getService(ServiceConfiguration serviceConfiguration) {
153            Object service;
154            if (serviceConfiguration.isQueue()) {
155                service = getQueueService(serviceConfiguration);
156            } else {
157                service = getTopicService(serviceConfiguration);
158            }
159            return service;
160        }
161    
162        /**
163         * Get the service as a topic. This means we want to contact every service
164         * that is a part of this topic. We've grabbed all the services that are a
165         * part of this topic and we want to make sure that we get everyone of them =
166         * that is we want to circumvent loadbalancing and therefore not ask for the
167         * service by it's name but the url to get the exact service we want.
168         * 
169         * @param serviceInfo
170         * @return
171         */
172        protected Object getTopicService(ServiceConfiguration serviceConfiguration) {
173            // get the service locally if we have it so we don't go through any
174            // remoting
175            ServiceBus serviceBus = KsbApiServiceLocator.getServiceBus();
176            Endpoint endpoint = serviceBus.getConfiguredEndpoint(serviceConfiguration);
177            if (endpoint == null) {
178                    return null;
179            }
180            return endpoint.getService();
181        }
182    
183        /**
184         * Because this is a queue we just need to grab one.
185         * 
186         * @param serviceInfo
187         * @return
188         */
189        protected Object getQueueService(ServiceConfiguration serviceConfiguration) {
190            ServiceBus serviceBus = KsbApiServiceLocator.getServiceBus();
191            return serviceBus.getService(serviceConfiguration.getServiceName());
192        }
193    
194        /**
195         * Used in case the thread that dumped this work into the queue is waiting
196         * for the work to be done to continue processing.
197         * 
198         * @param callback
199         */
200        protected void notifyOnCallback(AsynchronousCall methodCall, Object callResult) {
201            AsynchronousCallback callback = methodCall.getCallback();
202            notifyOnCallback(methodCall, callback, callResult);
203        }
204    
205        protected void notifyGlobalCallbacks(AsynchronousCall methodCall, Object callResult) {
206            if (LOG.isDebugEnabled()) {
207                LOG.debug("Notifying global callbacks");
208            }
209            for (AsynchronousCallback globalCallBack : GlobalCallbackRegistry.getCallbacks()) {
210                notifyOnCallback(methodCall, globalCallBack, callResult);
211            }
212        }
213    
214        protected void notifyOnCallback(AsynchronousCall methodCall, AsynchronousCallback callback, Object callResult) {
215            if (callback != null) {
216                try {
217                    synchronized (callback) {
218                        if (LOG.isDebugEnabled()) {
219                            LOG.debug("Notifying callback " + callback + " with callResult " + callResult);
220                        }
221                        callback.notifyAll();
222                        if (callResult instanceof Serializable || callResult == null) {
223                            callback.callback((Serializable) callResult, methodCall);
224                        } else {
225                            // may never happen
226                            LOG.warn("Attempted to call callback with non-serializable object.");
227                        }
228                    }
229                } catch (Throwable t) {
230                    LOG.error("Caught throwable from callback object " + callback.getClass(), t);
231                }
232            }
233        }
234    
235        public PersistedMessageBO getMessage() {
236            return this.message;
237        }
238    
239        public void setMessage(PersistedMessageBO message) {
240            this.message = message;
241        }
242    
243        public Object getService() {
244            return this.service;
245        }
246    
247        public AsynchronousCall getMethodCall() {
248            return this.methodCall;
249        }
250    
251        public void setMethodCall(AsynchronousCall methodCall) {
252            this.methodCall = methodCall;
253        }
254    
255        public void setService(Object service) {
256            this.service = service;
257        }
258    }