001    /**
002     * Copyright 2005-2012 The Kuali Foundation
003     *
004     * Licensed under the Educational Community License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.opensource.org/licenses/ecl2.php
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package org.kuali.rice.ksb.messaging;
017    
018    import java.io.Serializable;
019    import java.lang.reflect.Method;
020    import java.sql.Timestamp;
021    
022    import javax.xml.namespace.QName;
023    
024    import org.apache.log4j.Logger;
025    import org.kuali.rice.core.api.config.property.Config;
026    import org.kuali.rice.core.api.config.property.ConfigContext;
027    import org.kuali.rice.core.api.exception.RiceRuntimeException;
028    import org.kuali.rice.ksb.api.KsbApiServiceLocator;
029    import org.kuali.rice.ksb.api.bus.Endpoint;
030    import org.kuali.rice.ksb.api.bus.ServiceBus;
031    import org.kuali.rice.ksb.api.bus.ServiceConfiguration;
032    import org.kuali.rice.ksb.api.messaging.AsynchronousCall;
033    import org.kuali.rice.ksb.api.messaging.AsynchronousCallback;
034    import org.kuali.rice.ksb.service.KSBServiceLocator;
035    import org.kuali.rice.ksb.util.KSBConstants;
036    import org.springframework.transaction.TransactionStatus;
037    import org.springframework.transaction.support.TransactionCallback;
038    
039    /**
040     * Handles invocation of a {@link PersistedMessageBO}.
041     * 
042     * @author Kuali Rice Team (rice.collab@kuali.org)
043     */
044    public class MessageServiceInvoker implements Runnable {
045    
046        protected static final Logger LOG = Logger.getLogger(MessageServiceInvoker.class);
047    
048        private PersistedMessageBO message;
049        private Object service;
050        private AsynchronousCall methodCall;
051    
052        public MessageServiceInvoker(PersistedMessageBO message) {
053            this.message = message;
054        }
055    
056        public void run() {
057            LOG.debug("calling service from persisted message " + getMessage().getRouteQueueId());
058            if(ConfigContext.getCurrentContextConfig().getBooleanProperty(Config.MESSAGE_PERSISTENCE)) {
059                PersistedMessagePayload messageFromDB = KSBServiceLocator.getMessageQueueService().findByPersistedMessageByRouteQueueId(getMessage().getRouteQueueId());
060                if(messageFromDB == null) {
061                    // If the message is no longer found in the database we should skip this processing
062                    return;
063                }
064            }
065            Object result = null;
066            try {
067                result = KSBServiceLocator.getTransactionTemplate().execute(new TransactionCallback<Object>() {
068                    public Object doInTransaction(TransactionStatus status) {
069                        AsynchronousCall methodCall = getMessage().getPayload().getMethodCall();
070                        Object result = null;
071                        try {
072                            result = invokeService(methodCall);
073                            KSBServiceLocator.getMessageQueueService().delete(getMessage());
074                        } catch (Throwable t) {
075                            LOG.warn("Caught throwable making async service call " + methodCall, t);
076                            throw new MessageProcessingException(t);
077                        }
078                        return result;
079                    }
080                });
081            } catch (Throwable t) {
082                    // if we are in synchronous mode, we can't put the message into exception routing, let's instead throw the error up to the calling code
083                    // however, for the purposes of the unit tests, even when in synchronous mode, we want to call the exception routing service, so check a parameter for that as well
084                    boolean allowSyncExceptionRouting = new Boolean(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.ALLOW_SYNC_EXCEPTION_ROUTING));
085                    if (!allowSyncExceptionRouting && KSBConstants.MESSAGING_SYNCHRONOUS.equals(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY))) {
086                            if (t instanceof RuntimeException) {
087                                    throw (RuntimeException)t;
088                            }
089                            throw new RiceRuntimeException(t);
090                    } else {
091                            placeInExceptionRouting(t, getMethodCall(), getService());
092                    }
093            } finally {
094                try {
095                    notifyOnCallback(methodCall, result);
096                } catch (Exception e) {
097                    LOG.warn("Exception caught notifying callback", e);
098                }
099                try {
100                    notifyGlobalCallbacks(methodCall, result);
101                } catch (Exception e) {
102                    LOG.warn("Exception caught notifying callback", e);
103                }
104            }
105        }
106    
107        /**
108         * Executed when an exception is encountered during message invocation.
109         * Attempts to call the ExceptionHandler for the message, if that fails it
110         * will attempt to set the status of the message in the queue to
111         * "EXCEPTION".
112         */
113        protected void placeInExceptionRouting(Throwable t, AsynchronousCall call, Object service) {
114            LOG.error("Error processing message: " + this.message, t);
115            final Throwable throwable;
116            if (t instanceof MessageProcessingException) {
117                throwable = t.getCause();
118            } else {
119                throwable = t;
120            }
121            try {
122                    try {
123                            KSBServiceLocator.getExceptionRoutingService().placeInExceptionRouting(throwable, this.message, service);
124                    } catch (Throwable t1) {
125                            KSBServiceLocator.getExceptionRoutingService().placeInExceptionRoutingLastDitchEffort(throwable, this.message, service);
126                    }
127            } catch (Throwable t2) {
128                LOG.error("An error was encountered when invoking exception handler for message. Attempting to change message status to EXCEPTION.", t2);
129                message.setQueueStatus(KSBConstants.ROUTE_QUEUE_EXCEPTION);
130                message.setQueueDate(new Timestamp(System.currentTimeMillis()));
131                try {
132                    KSBServiceLocator.getMessageQueueService().save(message);
133                } catch (Throwable t3) {
134                    LOG.fatal("Failed to flip status of message to EXCEPTION!!!", t3);
135                }
136            }
137        }
138    
139        /**
140         * Invokes the AsynchronousCall represented on the methodCall on the service
141         * contained in the ServiceInfo object on the AsynchronousCall.
142         * 
143         */
144        protected Object invokeService(AsynchronousCall methodCall) throws Exception {
145            this.methodCall = methodCall;
146            ServiceConfiguration serviceConfiguration = methodCall.getServiceConfiguration();
147            QName serviceName = serviceConfiguration.getServiceName();
148            if (LOG.isDebugEnabled()) {
149                LOG.debug("Attempting to call service " + serviceName);
150            }
151    
152            Object service = getService(serviceConfiguration);
153            if (service == null) {
154                    throw new RiceRuntimeException("Failed to locate service endpoint for message: " + serviceConfiguration);
155            }
156            Method method = service.getClass().getMethod(methodCall.getMethodName(), methodCall.getParamTypes());
157            return method.invoke(service, methodCall.getArguments());
158        }
159    
160        protected Object getService(ServiceConfiguration serviceConfiguration) {
161            Object service;
162            if (serviceConfiguration.isQueue()) {
163                service = getQueueService(serviceConfiguration);
164            } else {
165                service = getTopicService(serviceConfiguration);
166            }
167            return service;
168        }
169    
170        /**
171         * Get the service as a topic. This means we want to contact every service
172         * that is a part of this topic. We've grabbed all the services that are a
173         * part of this topic and we want to make sure that we get everyone of them =
174         * that is we want to circumvent loadbalancing and therefore not ask for the
175         * service by it's name but the url to get the exact service we want.
176         * 
177         * @param serviceInfo
178         * @return
179         */
180        protected Object getTopicService(ServiceConfiguration serviceConfiguration) {
181            // get the service locally if we have it so we don't go through any
182            // remoting
183            ServiceBus serviceBus = KsbApiServiceLocator.getServiceBus();
184            Endpoint endpoint = serviceBus.getConfiguredEndpoint(serviceConfiguration);
185            if (endpoint == null) {
186                    return null;
187            }
188            return endpoint.getService();
189        }
190    
191        /**
192         * Because this is a queue we just need to grab one.
193         * 
194         * @param serviceInfo
195         * @return
196         */
197        protected Object getQueueService(ServiceConfiguration serviceConfiguration) {
198            ServiceBus serviceBus = KsbApiServiceLocator.getServiceBus();
199            return serviceBus.getService(serviceConfiguration.getServiceName(), serviceConfiguration.getApplicationId());
200        }
201    
202        /**
203         * Used in case the thread that dumped this work into the queue is waiting
204         * for the work to be done to continue processing.
205         * 
206         * @param callback
207         */
208        protected void notifyOnCallback(AsynchronousCall methodCall, Object callResult) {
209            AsynchronousCallback callback = methodCall.getCallback();
210            notifyOnCallback(methodCall, callback, callResult);
211        }
212    
213        protected void notifyGlobalCallbacks(AsynchronousCall methodCall, Object callResult) {
214            if (LOG.isDebugEnabled()) {
215                LOG.debug("Notifying global callbacks");
216            }
217            for (AsynchronousCallback globalCallBack : GlobalCallbackRegistry.getCallbacks()) {
218                notifyOnCallback(methodCall, globalCallBack, callResult);
219            }
220        }
221    
222        protected void notifyOnCallback(AsynchronousCall methodCall, AsynchronousCallback callback, Object callResult) {
223            if (callback != null) {
224                try {
225                    synchronized (callback) {
226                        if (LOG.isDebugEnabled()) {
227                            LOG.debug("Notifying callback " + callback + " with callResult " + callResult);
228                        }
229                        callback.notifyAll();
230                        if (callResult instanceof Serializable || callResult == null) {
231                            callback.callback((Serializable) callResult, methodCall);
232                        } else {
233                            // may never happen
234                            LOG.warn("Attempted to call callback with non-serializable object.");
235                        }
236                    }
237                } catch (Throwable t) {
238                    LOG.error("Caught throwable from callback object " + callback.getClass(), t);
239                }
240            }
241        }
242    
243        public PersistedMessageBO getMessage() {
244            return this.message;
245        }
246    
247        public void setMessage(PersistedMessageBO message) {
248            this.message = message;
249        }
250    
251        public Object getService() {
252            return this.service;
253        }
254    
255        public AsynchronousCall getMethodCall() {
256            return this.methodCall;
257        }
258    
259        public void setMethodCall(AsynchronousCall methodCall) {
260            this.methodCall = methodCall;
261        }
262    
263        public void setService(Object service) {
264            this.service = service;
265        }
266    }