001    /**
002     * Copyright 2005-2012 The Kuali Foundation
003     *
004     * Licensed under the Educational Community License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.opensource.org/licenses/ecl2.php
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package org.kuali.rice.ksb.messaging.web;
017    
018    import org.apache.commons.collections.comparators.ComparableComparator;
019    import org.apache.commons.lang.StringUtils;
020    import org.apache.commons.lang.math.NumberUtils;
021    import org.apache.struts.action.ActionForm;
022    import org.apache.struts.action.ActionForward;
023    import org.apache.struts.action.ActionMapping;
024    import org.apache.struts.action.ActionMessage;
025    import org.apache.struts.action.ActionMessages;
026    import org.kuali.rice.core.api.config.CoreConfigHelper;
027    import org.kuali.rice.core.api.config.property.ConfigContext;
028    import org.kuali.rice.core.api.util.ConcreteKeyValue;
029    import org.kuali.rice.core.api.util.RiceConstants;
030    import org.kuali.rice.core.api.util.RiceUtilities;
031    import org.kuali.rice.core.api.util.io.SerializationUtils;
032    import org.kuali.rice.ksb.api.KsbApiServiceLocator;
033    import org.kuali.rice.ksb.api.messaging.AsynchronousCall;
034    import org.kuali.rice.ksb.api.registry.ServiceInfo;
035    import org.kuali.rice.ksb.messaging.MessageFetcher;
036    import org.kuali.rice.ksb.messaging.MessageServiceInvoker;
037    import org.kuali.rice.ksb.messaging.PersistedMessageBO;
038    import org.kuali.rice.ksb.messaging.service.MessageQueueService;
039    import org.kuali.rice.ksb.service.KSBServiceLocator;
040    import org.kuali.rice.ksb.util.KSBConstants;
041    
042    import javax.servlet.ServletException;
043    import javax.servlet.http.HttpServletRequest;
044    import javax.servlet.http.HttpServletResponse;
045    import java.io.IOException;
046    import java.sql.Timestamp;
047    import java.util.ArrayList;
048    import java.util.Calendar;
049    import java.util.Collections;
050    import java.util.Comparator;
051    import java.util.Date;
052    import java.util.HashMap;
053    import java.util.Iterator;
054    import java.util.List;
055    import java.util.Map;
056    
057    
058    /**
059     * Struts action for interacting with the queue of messages.
060     *
061     * @author Kuali Rice Team (rice.collab@kuali.org)
062     */
063    public class MessageQueueAction extends KSBAction {
064    
065        @Override
066            public ActionForward start(ActionMapping mapping, ActionForm form, HttpServletRequest request,
067                HttpServletResponse response) throws IOException, ServletException {
068            return mapping.findForward("report");
069        }
070    
071        public ActionForward save(ActionMapping mapping, ActionForm form, HttpServletRequest request,
072                HttpServletResponse response) throws Exception {
073            MessageQueueForm routeQueueForm = (MessageQueueForm) form;
074            save(routeQueueForm);
075    
076            routeQueueForm.getMessageQueueFromForm().getRouteQueueId();
077            ActionMessages messages = new ActionMessages();
078            messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.saved"));
079            saveMessages(request, messages);
080    
081    //      routeQueueForm.setMessageId(null);
082    ////    routeQueueForm.setMessageQueueFromDatabase(null);
083    ////    routeQueueForm.setMessageQueueFromForm(null);
084    //      routeQueueForm.setShowEdit("yes");
085    //      routeQueueForm.setMethodToCall("");
086    //      establishRequiredState(request, form);
087    //      routeQueueForm.setMessageId(routeQueueId);
088    ////    routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
089    //      routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
090    //      routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(routeQueueForm.getMessageQueueFromForm()));
091            return mapping.findForward("report");
092        }
093    
094        public ActionForward saveAndResubmit(ActionMapping mapping, ActionForm form, HttpServletRequest request,
095                HttpServletResponse response) throws Exception {
096            MessageQueueForm routeQueueForm = (MessageQueueForm) form;
097            PersistedMessageBO message = save(routeQueueForm);
098            KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
099    
100            ActionMessages messages = new ActionMessages();
101            messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.queued"));
102            saveMessages(request, messages);
103    
104            routeQueueForm.setMessageId(null);
105            routeQueueForm.setMessageQueueFromDatabase(null);
106            routeQueueForm.setMessageQueueFromForm(null);
107            routeQueueForm.setShowEdit("yes");
108            routeQueueForm.setMethodToCall("");
109            establishRequiredState(request, form);
110            routeQueueForm.setMessageId(message.getRouteQueueId());
111            routeQueueForm.setMessageQueueFromForm(message);
112            routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
113            routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(message));
114            return mapping.findForward("report");
115        }
116    
117    //    public ActionForward saveAndForward(ActionMapping mapping, ActionForm form, HttpServletRequest request,
118    //          HttpServletResponse response) throws Exception {
119    //      MessageQueueForm routeQueueForm = (MessageQueueForm) form;
120    //      PersistedMessageBO message = save(routeQueueForm);
121    //      ForwardedCallHandler adminService = getAdminServiceToForwardTo(message, routeQueueForm);
122    //      AsynchronousCall methodCall = message.getPayload().getMethodCall();
123    //      message.setMethodCall(methodCall);
124    //      adminService.handleCall(message);
125    //      KSBServiceLocator.getRouteQueueService().delete(message);
126    //
127    //      ActionMessages messages = new ActionMessages();
128    //      messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.queued"));
129    //      saveMessages(request, messages);
130    //
131    //      routeQueueForm.setMessageId(null);
132    //      routeQueueForm.setMessageQueueFromDatabase(null);
133    //      routeQueueForm.setMessageQueueFromForm(null);
134    //      routeQueueForm.setShowEdit("yes");
135    //      routeQueueForm.setMethodToCall("");
136    //      establishRequiredState(request, form);
137    //      routeQueueForm.setMessageId(message.getRouteQueueId());
138    //      routeQueueForm.setMessageQueueFromForm(message);
139    //      routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
140    //      routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(message));
141    //      return mapping.findForward("report");
142    //    }
143    
144        private PersistedMessageBO save(MessageQueueForm routeQueueForm) {
145            Long routeQueueId = routeQueueForm.getMessageQueueFromForm().getRouteQueueId();
146            if ((routeQueueId == null) || (routeQueueId.longValue() <= 0)) {
147                throw new IllegalArgumentException("Invalid routeQueueId passed in.  Cannot save");
148            }
149            // save the message
150            PersistedMessageBO existingMessage = KSBServiceLocator.getMessageQueueService().findByRouteQueueId(routeQueueId);
151            PersistedMessageBO message = routeQueueForm.getMessageQueueFromForm();
152            // copy the new values over
153            if (existingMessage == null) {
154                throw new RuntimeException("Could locate the existing message, it may have already been processed.");
155            }
156    
157            existingMessage.setQueuePriority(message.getQueuePriority());
158            existingMessage.setIpNumber(message.getIpNumber());
159            existingMessage.setLockVerNbr(message.getLockVerNbr());
160            existingMessage.setApplicationId(message.getApplicationId());
161            existingMessage.setMethodName(message.getMethodName());
162            existingMessage.setQueueStatus(message.getQueueStatus());
163            existingMessage.setRetryCount(message.getRetryCount());
164            existingMessage.setServiceName(message.getServiceName());
165            existingMessage.setValue1(message.getValue1());
166            existingMessage.setValue2(message.getValue2());
167            KSBServiceLocator.getMessageQueueService().save(existingMessage);
168            return existingMessage;
169        }
170    
171        /**
172             * Performs a quick ReQueue of the indicated persisted message.
173             *
174             * The net effect of this requeue is to set the Date to now, and to reset the RetryCount to zero. The payload is not
175             * modified.
176             *
177             * @param message
178             *                The populated message to be requeued.
179             */
180        protected void quickRequeueMessage(PersistedMessageBO message) {
181            message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING);
182            message.setQueueDate(new Timestamp(Calendar.getInstance().getTimeInMillis()));
183            message.setRetryCount(new Integer(0));
184            getRouteQueueService().save(message);
185        }
186    
187        public ActionForward quickRequeueMessage(ActionMapping mapping, ActionForm form, HttpServletRequest request,
188                HttpServletResponse response) throws Exception {
189            MessageQueueForm routeQueueForm = (MessageQueueForm) form;
190            if (routeQueueForm.getMessageQueueFromDatabase() == null) {
191                throw new IllegalArgumentException("No messageId passed in with the Request.");
192            }
193    
194            PersistedMessageBO message = routeQueueForm.getMessageQueueFromDatabase();
195            quickRequeueMessage(message);
196            KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
197    
198            ActionMessages messages = new ActionMessages();
199            messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.requeued"));
200            saveMessages(request, messages);
201    
202            routeQueueForm.setMessageQueueFromDatabase(null);
203            routeQueueForm.setMessageQueueFromForm(null);
204            routeQueueForm.setMessageId(null);
205            routeQueueForm.setMethodToCall("");
206    
207            // re-run the state method to load the full set of rows
208            establishRequiredState(request, form);
209            return mapping.findForward("report");
210        }
211    
212        public ActionForward edit(ActionMapping mapping, ActionForm form, HttpServletRequest request,
213                HttpServletResponse response) throws IOException, ServletException {
214            MessageQueueForm routeQueueForm = (MessageQueueForm) form;
215            routeQueueForm.setShowEdit("yes");
216            routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
217            routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
218            routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(routeQueueForm.getMessageQueueFromForm()));
219            return mapping.findForward("basic");
220        }
221    
222        public ActionForward view(ActionMapping mapping, ActionForm form, HttpServletRequest request,
223                HttpServletResponse response) throws Exception {
224            MessageQueueForm routeQueueForm = (MessageQueueForm) form;
225            routeQueueForm.setShowEdit("no");
226            routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
227            routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
228            AsynchronousCall messagePayload = unwrapPayload(routeQueueForm.getMessageQueueFromDatabase());
229            routeQueueForm.getMessageQueueFromForm().setMethodCall(messagePayload);
230            return mapping.findForward("payload");
231        }
232    
233        public ActionForward reset(ActionMapping mapping, ActionForm form, HttpServletRequest request,
234                HttpServletResponse response) throws Exception {
235            MessageQueueForm routeQueueForm = (MessageQueueForm) form;
236            if (routeQueueForm.getShowEdit().equals("yes")) {
237                routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
238            }
239            return mapping.findForward("basic");
240        }
241    
242        public ActionForward clear(ActionMapping mapping, ActionForm form, HttpServletRequest request,
243                HttpServletResponse response) throws Exception {
244            MessageQueueForm routeQueueForm = (MessageQueueForm) form;
245            routeQueueForm.getMessageQueueFromForm().setQueuePriority(null);
246            routeQueueForm.getMessageQueueFromForm().setQueueStatus(null);
247            routeQueueForm.getMessageQueueFromForm().setQueueDate(null);
248            routeQueueForm.getMessageQueueFromForm().setExpirationDate(null);
249            routeQueueForm.getMessageQueueFromForm().setRetryCount(null);
250            routeQueueForm.getMessageQueueFromForm().setIpNumber(null);
251            routeQueueForm.getMessageQueueFromForm().setServiceName(null);
252            routeQueueForm.getMessageQueueFromForm().setApplicationId(null);
253            routeQueueForm.getMessageQueueFromForm().setMethodName(null);
254            routeQueueForm.getMessageQueueFromForm().setPayload(null);
255            routeQueueForm.getMessageQueueFromForm().setMethodCall(null);
256            routeQueueForm.setExistingQueueDate(null);
257            routeQueueForm.setNewQueueDate(null);
258            return mapping.findForward("basic");
259        }
260    
261        public ActionForward delete(ActionMapping mapping, ActionForm form, HttpServletRequest request,
262                HttpServletResponse response) throws Exception {
263            MessageQueueForm routeQueueForm = (MessageQueueForm) form;
264            routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
265            routeQueueForm.setMessageQueueFromDatabase(null);
266            getRouteQueueService().delete(routeQueueForm.getMessageQueueFromForm());
267            ActionMessages messages = new ActionMessages();
268            messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.deleted", routeQueueForm
269                    .getMessageQueueFromForm().getRouteQueueId().toString()));
270            saveMessages(request, messages);
271            routeQueueForm.setMessageId(null);
272            establishRequiredState(request, form);
273            return mapping.findForward("report");
274        }
275    
276        public ActionForward executeMessageFetcher(ActionMapping mapping,  ActionForm form, HttpServletRequest request,
277                HttpServletResponse response) throws Exception {
278            MessageQueueForm routeQueueForm = (MessageQueueForm) form;
279            ActionMessages messages = new ActionMessages();
280            if (routeQueueForm.getMaxMessageFetcherMessages() == null || routeQueueForm.getMaxMessageFetcherMessages() <= 0) {
281                messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.invalidMessages", routeQueueForm.getMaxMessageFetcherMessages()));
282            }
283            if (!messages.isEmpty()) {
284                saveMessages(request, messages);
285                return mapping.findForward("report");
286            }
287            new MessageFetcher(routeQueueForm.getMaxMessageFetcherMessages()).run();
288            return mapping.findForward("report");
289        }
290    
291        /**
292             * Sets up the expected state by retrieving the selected RouteQueue by RouteQueueId, and placing it in the
293             * ExistingRouteQueue member.
294             *
295             * Called by the super's Execute method on every request.
296             */
297        @Override
298            public ActionMessages establishRequiredState(HttpServletRequest request, ActionForm form) throws Exception {
299            request.setAttribute("rice_constant", getServlet().getServletContext().getAttribute("RiceConstants"));
300            request.setAttribute("ksb_constant", getServlet().getServletContext().getAttribute("KSBConstants"));
301            MessageQueueForm routeQueueForm = (MessageQueueForm) form;
302            routeQueueForm.setMyIpAddress(RiceUtilities.getIpNumber());
303            routeQueueForm.setMyApplicationId(CoreConfigHelper.getApplicationId());
304            routeQueueForm.setMessagePersistence(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_PERSISTENCE));
305            routeQueueForm.setMessageDelivery(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY));
306            routeQueueForm.setMessageOff(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGING_OFF));
307            List<ServiceInfo> services = KsbApiServiceLocator.getServiceRegistry().getAllOnlineServices();
308            if (routeQueueForm.getMessageId() != null) {
309                PersistedMessageBO rq = getRouteQueueService().findByRouteQueueId(routeQueueForm.getMessageId());
310                if (rq != null) {
311                    routeQueueForm.setExistingQueueDate(RiceConstants.getDefaultDateFormat().format(new Date()));
312                    routeQueueForm.setMessageQueueFromDatabase(rq);
313                    // establish IP addresses where this message could safely be forwarded to
314                    String serviceName = rq.getServiceName();
315                    for (ServiceInfo serviceInfo : services) {
316                        if (serviceInfo.getServiceName().equals(serviceName)) {
317                            routeQueueForm.getIpAddresses().add(
318                                    new ConcreteKeyValue(serviceInfo.getServerIpAddress(), serviceInfo.getServerIpAddress()));
319                        }
320                    }
321                } else {
322                    ActionMessages messages = new ActionMessages();
323                    messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage(
324                            "messagequeue.RouteQueueService.queuedDocumentNotFound", routeQueueForm.getMessageId().toString()));
325                    return messages;
326                }
327                routeQueueForm.setMessageId(null);
328            } else if (!"clear".equalsIgnoreCase(request.getParameter("methodToCall"))) {
329                List<PersistedMessageBO> queueEntries = findRouteQueues(request, routeQueueForm, routeQueueForm.getMaxRows() + 1);
330                if (queueEntries.size() > 0) {
331                    Collections.sort(queueEntries, new Comparator() {
332                        private Comparator comp = new ComparableComparator();
333    
334                        @Override
335                            public int compare(Object object1, Object object2) {
336                            if (object1 == null && object2 == null) {
337                                return 0;
338                            } else if (object1 == null) {
339                                return 1;
340                            } else if (object2 == null) {
341                                return -1;
342                            }
343                            Long id1 = ((PersistedMessageBO) object1).getRouteQueueId();
344                            Long id2 = ((PersistedMessageBO) object2).getRouteQueueId();
345    
346                            try {
347                                return this.comp.compare(id1, id2);
348                            } catch (Exception e) {
349                                return 0;
350                            }
351                        }
352                    });
353                }
354                routeQueueForm.setMessageQueueRows(queueEntries);
355            }
356            return null;
357        }
358    
359        protected List<PersistedMessageBO> findRouteQueues(HttpServletRequest request, MessageQueueForm routeQueueForm, int maxRows) {
360            List<PersistedMessageBO> routeQueues = new ArrayList<PersistedMessageBO>();
361    
362            // no filter applied
363            if (StringUtils.isBlank(routeQueueForm.getFilterApplied())) {
364                routeQueues.addAll(getRouteQueueService().findAll(maxRows));
365            }
366    
367            // one or more filters applied
368            else {
369                if (!StringUtils.isBlank(routeQueueForm.getRouteQueueIdFilter())) {
370                    if (!NumberUtils.isNumber(routeQueueForm.getRouteQueueIdFilter())) {
371                        // TODO better error handling here
372                        throw new RuntimeException("Message Id must be a number.");
373                    }
374                }
375    
376                Map<String, String> criteriaValues = new HashMap<String, String>();
377                String key = null;
378                String value = null;
379                String trimmedKey = null;
380                for (Iterator iter = request.getParameterMap().keySet().iterator(); iter.hasNext();) {
381                    key = (String) iter.next();
382                    if (key.endsWith(KSBConstants.ROUTE_QUEUE_FILTER_SUFFIX)) {
383                        value = request.getParameter(key);
384                        if (StringUtils.isNotBlank(value)) {
385                            trimmedKey = key.substring(0, key.indexOf(KSBConstants.ROUTE_QUEUE_FILTER_SUFFIX));
386                            criteriaValues.put(trimmedKey, value);
387                        }
388                    }
389                }
390                routeQueues.addAll(getRouteQueueService().findByValues(criteriaValues, maxRows));
391            }
392            return routeQueues;
393        }
394    
395        private MessageQueueService getRouteQueueService() {
396            return KSBServiceLocator.getMessageQueueService();
397        }
398    
399        /**
400             * Extracts the payload from a PersistedMessageBO, attempts to convert it to the expected AsynchronousCall type, and
401             * returns it.
402             *
403             * Throws an IllegalArgumentException if the decoded payload isnt of the expected type.
404             *
405             * @param message
406             *                The populated PersistedMessageBO object to extract the payload from.
407             * @return Returns the payload if one is present and it can be deserialized, otherwise returns null.
408             */
409        protected AsynchronousCall unwrapPayload(PersistedMessageBO message) {
410            if (message == null || message.getPayload() == null) {
411                return null;
412            }
413            String encodedPayload = message.getPayload().getPayload();
414            if (StringUtils.isBlank(encodedPayload)) {
415                return null;
416            }
417            Object decodedPayload = null;
418            if (encodedPayload != null) {
419                decodedPayload = SerializationUtils.deserializeFromBase64(encodedPayload);
420            }
421            // fail fast if its not the expected type of AsynchronousCall
422            if ((decodedPayload != null) && !(decodedPayload instanceof AsynchronousCall)) {
423                throw new IllegalArgumentException("PersistedMessageBO payload was not of the expected class. " + "Expected was ["
424                        + AsynchronousCall.class.getName() + "], actual was: [" + decodedPayload.getClass().getName() + "]");
425            }
426            return (AsynchronousCall) decodedPayload;
427        }
428    
429    }