001/**
002 * Copyright 2005-2015 The Kuali Foundation
003 *
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/ecl2.php
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.kuali.rice.ksb.messaging.web;
017
018import org.apache.commons.collections.comparators.ComparableComparator;
019import org.apache.commons.lang.StringUtils;
020import org.apache.commons.lang.math.NumberUtils;
021import org.apache.struts.action.ActionForm;
022import org.apache.struts.action.ActionForward;
023import org.apache.struts.action.ActionMapping;
024import org.apache.struts.action.ActionMessage;
025import org.apache.struts.action.ActionMessages;
026import org.kuali.rice.core.api.config.CoreConfigHelper;
027import org.kuali.rice.core.api.config.property.ConfigContext;
028import org.kuali.rice.core.api.util.ConcreteKeyValue;
029import org.kuali.rice.core.api.util.RiceConstants;
030import org.kuali.rice.core.api.util.RiceUtilities;
031import org.kuali.rice.core.api.util.io.SerializationUtils;
032import org.kuali.rice.ksb.api.KsbApiServiceLocator;
033import org.kuali.rice.ksb.api.messaging.AsynchronousCall;
034import org.kuali.rice.ksb.api.registry.ServiceInfo;
035import org.kuali.rice.ksb.messaging.MessageFetcher;
036import org.kuali.rice.ksb.messaging.MessageServiceInvoker;
037import org.kuali.rice.ksb.messaging.PersistedMessageBO;
038import org.kuali.rice.ksb.messaging.service.MessageQueueService;
039import org.kuali.rice.ksb.service.KSBServiceLocator;
040import org.kuali.rice.ksb.util.KSBConstants;
041
042import javax.servlet.ServletException;
043import javax.servlet.http.HttpServletRequest;
044import javax.servlet.http.HttpServletResponse;
045import java.io.IOException;
046import java.sql.Timestamp;
047import java.util.ArrayList;
048import java.util.Calendar;
049import java.util.Collections;
050import java.util.Comparator;
051import java.util.Date;
052import java.util.HashMap;
053import java.util.Iterator;
054import java.util.List;
055import java.util.Map;
056
057
058/**
059 * Struts action for interacting with the queue of messages.
060 *
061 * @author Kuali Rice Team (rice.collab@kuali.org)
062 */
063public class MessageQueueAction extends KSBAction {
064
065    @Override
066        public ActionForward start(ActionMapping mapping, ActionForm form, HttpServletRequest request,
067            HttpServletResponse response) throws IOException, ServletException {
068        return mapping.findForward("report");
069    }
070
071    public ActionForward save(ActionMapping mapping, ActionForm form, HttpServletRequest request,
072            HttpServletResponse response) throws Exception {
073        MessageQueueForm routeQueueForm = (MessageQueueForm) form;
074        save(routeQueueForm);
075
076        routeQueueForm.getMessageQueueFromForm().getRouteQueueId();
077        ActionMessages messages = new ActionMessages();
078        messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.saved"));
079        saveMessages(request, messages);
080
081//      routeQueueForm.setMessageId(null);
082////    routeQueueForm.setMessageQueueFromDatabase(null);
083////    routeQueueForm.setMessageQueueFromForm(null);
084//      routeQueueForm.setShowEdit("yes");
085//      routeQueueForm.setMethodToCall("");
086//      establishRequiredState(request, form);
087//      routeQueueForm.setMessageId(routeQueueId);
088////    routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
089//      routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
090//      routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(routeQueueForm.getMessageQueueFromForm()));
091        return mapping.findForward("report");
092    }
093
094    public ActionForward saveAndResubmit(ActionMapping mapping, ActionForm form, HttpServletRequest request,
095            HttpServletResponse response) throws Exception {
096        MessageQueueForm routeQueueForm = (MessageQueueForm) form;
097        PersistedMessageBO message = save(routeQueueForm);
098        KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
099
100        ActionMessages messages = new ActionMessages();
101        messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.queued"));
102        saveMessages(request, messages);
103
104        routeQueueForm.setMessageId(null);
105        routeQueueForm.setMessageQueueFromDatabase(null);
106        routeQueueForm.setMessageQueueFromForm(null);
107        routeQueueForm.setShowEdit("yes");
108        routeQueueForm.setMethodToCall("");
109        establishRequiredState(request, form);
110        routeQueueForm.setMessageId(message.getRouteQueueId());
111        routeQueueForm.setMessageQueueFromForm(message);
112        routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
113        routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(message));
114        return mapping.findForward("report");
115    }
116
117//    public ActionForward saveAndForward(ActionMapping mapping, ActionForm form, HttpServletRequest request,
118//          HttpServletResponse response) throws Exception {
119//      MessageQueueForm routeQueueForm = (MessageQueueForm) form;
120//      PersistedMessageBO message = save(routeQueueForm);
121//      ForwardedCallHandler adminService = getAdminServiceToForwardTo(message, routeQueueForm);
122//      AsynchronousCall methodCall = message.getPayload().getMethodCall();
123//      message.setMethodCall(methodCall);
124//      adminService.handleCall(message);
125//      KSBServiceLocator.getRouteQueueService().delete(message);
126//
127//      ActionMessages messages = new ActionMessages();
128//      messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.queued"));
129//      saveMessages(request, messages);
130//
131//      routeQueueForm.setMessageId(null);
132//      routeQueueForm.setMessageQueueFromDatabase(null);
133//      routeQueueForm.setMessageQueueFromForm(null);
134//      routeQueueForm.setShowEdit("yes");
135//      routeQueueForm.setMethodToCall("");
136//      establishRequiredState(request, form);
137//      routeQueueForm.setMessageId(message.getRouteQueueId());
138//      routeQueueForm.setMessageQueueFromForm(message);
139//      routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
140//      routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(message));
141//      return mapping.findForward("report");
142//    }
143
144    private PersistedMessageBO save(MessageQueueForm routeQueueForm) {
145        Long routeQueueId = routeQueueForm.getMessageQueueFromForm().getRouteQueueId();
146        if ((routeQueueId == null) || (routeQueueId.longValue() <= 0)) {
147            throw new IllegalArgumentException("Invalid routeQueueId passed in.  Cannot save");
148        }
149        // save the message
150        PersistedMessageBO existingMessage = KSBServiceLocator.getMessageQueueService().findByRouteQueueId(routeQueueId);
151        PersistedMessageBO message = routeQueueForm.getMessageQueueFromForm();
152        // copy the new values over
153        if (existingMessage == null) {
154            throw new RuntimeException("Could locate the existing message, it may have already been processed.");
155        }
156
157        existingMessage.setQueuePriority(message.getQueuePriority());
158        existingMessage.setIpNumber(message.getIpNumber());
159        existingMessage.setLockVerNbr(message.getLockVerNbr());
160        existingMessage.setApplicationId(message.getApplicationId());
161        existingMessage.setMethodName(message.getMethodName());
162        existingMessage.setQueueStatus(message.getQueueStatus());
163        existingMessage.setRetryCount(message.getRetryCount());
164        existingMessage.setServiceName(message.getServiceName());
165        existingMessage.setValue1(message.getValue1());
166        existingMessage.setValue2(message.getValue2());
167        KSBServiceLocator.getMessageQueueService().save(existingMessage);
168        return existingMessage;
169    }
170
171    /**
172         * Performs a quick ReQueue of the indicated persisted message.
173         *
174         * The net effect of this requeue is to set the Date to now, and to reset the RetryCount to zero. The payload is not
175         * modified.
176         *
177         * @param message
178         *                The populated message to be requeued.
179         */
180    protected void quickRequeueMessage(PersistedMessageBO message) {
181        message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING);
182        message.setQueueDate(new Timestamp(Calendar.getInstance().getTimeInMillis()));
183        message.setRetryCount(new Integer(0));
184        getRouteQueueService().save(message);
185    }
186
187    public ActionForward quickRequeueMessage(ActionMapping mapping, ActionForm form, HttpServletRequest request,
188            HttpServletResponse response) throws Exception {
189        MessageQueueForm routeQueueForm = (MessageQueueForm) form;
190        if (routeQueueForm.getMessageQueueFromDatabase() == null) {
191            throw new IllegalArgumentException("No messageId passed in with the Request.");
192        }
193
194        PersistedMessageBO message = routeQueueForm.getMessageQueueFromDatabase();
195        quickRequeueMessage(message);
196        KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
197
198        ActionMessages messages = new ActionMessages();
199        messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.requeued"));
200        saveMessages(request, messages);
201
202        routeQueueForm.setMessageQueueFromDatabase(null);
203        routeQueueForm.setMessageQueueFromForm(null);
204        routeQueueForm.setMessageId(null);
205        routeQueueForm.setMethodToCall("");
206
207        // re-run the state method to load the full set of rows
208        establishRequiredState(request, form);
209        return mapping.findForward("report");
210    }
211
212    public ActionForward edit(ActionMapping mapping, ActionForm form, HttpServletRequest request,
213            HttpServletResponse response) throws IOException, ServletException {
214        MessageQueueForm routeQueueForm = (MessageQueueForm) form;
215        routeQueueForm.setShowEdit("yes");
216        routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
217        routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
218        routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(routeQueueForm.getMessageQueueFromForm()));
219        return mapping.findForward("basic");
220    }
221
222    public ActionForward view(ActionMapping mapping, ActionForm form, HttpServletRequest request,
223            HttpServletResponse response) throws Exception {
224        MessageQueueForm routeQueueForm = (MessageQueueForm) form;
225        routeQueueForm.setShowEdit("no");
226        routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
227        routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
228        AsynchronousCall messagePayload = unwrapPayload(routeQueueForm.getMessageQueueFromDatabase());
229        routeQueueForm.getMessageQueueFromForm().setMethodCall(messagePayload);
230        return mapping.findForward("payload");
231    }
232
233    public ActionForward reset(ActionMapping mapping, ActionForm form, HttpServletRequest request,
234            HttpServletResponse response) throws Exception {
235        MessageQueueForm routeQueueForm = (MessageQueueForm) form;
236        if (routeQueueForm.getShowEdit().equals("yes")) {
237            routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
238        }
239        return mapping.findForward("basic");
240    }
241
242    public ActionForward clear(ActionMapping mapping, ActionForm form, HttpServletRequest request,
243            HttpServletResponse response) throws Exception {
244        MessageQueueForm routeQueueForm = (MessageQueueForm) form;
245        routeQueueForm.getMessageQueueFromForm().setQueuePriority(null);
246        routeQueueForm.getMessageQueueFromForm().setQueueStatus(null);
247        routeQueueForm.getMessageQueueFromForm().setQueueDate(null);
248        routeQueueForm.getMessageQueueFromForm().setExpirationDate(null);
249        routeQueueForm.getMessageQueueFromForm().setRetryCount(null);
250        routeQueueForm.getMessageQueueFromForm().setIpNumber(null);
251        routeQueueForm.getMessageQueueFromForm().setServiceName(null);
252        routeQueueForm.getMessageQueueFromForm().setApplicationId(null);
253        routeQueueForm.getMessageQueueFromForm().setMethodName(null);
254        routeQueueForm.getMessageQueueFromForm().setPayload(null);
255        routeQueueForm.getMessageQueueFromForm().setMethodCall(null);
256        routeQueueForm.setExistingQueueDate(null);
257        routeQueueForm.setNewQueueDate(null);
258        return mapping.findForward("basic");
259    }
260
261    public ActionForward delete(ActionMapping mapping, ActionForm form, HttpServletRequest request,
262            HttpServletResponse response) throws Exception {
263        MessageQueueForm routeQueueForm = (MessageQueueForm) form;
264        routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
265        routeQueueForm.setMessageQueueFromDatabase(null);
266        getRouteQueueService().delete(routeQueueForm.getMessageQueueFromForm());
267        ActionMessages messages = new ActionMessages();
268        messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.deleted", routeQueueForm
269                .getMessageQueueFromForm().getRouteQueueId().toString()));
270        saveMessages(request, messages);
271        routeQueueForm.setMessageId(null);
272        establishRequiredState(request, form);
273        return mapping.findForward("report");
274    }
275
276    public ActionForward executeMessageFetcher(ActionMapping mapping,  ActionForm form, HttpServletRequest request,
277            HttpServletResponse response) throws Exception {
278        MessageQueueForm routeQueueForm = (MessageQueueForm) form;
279        ActionMessages messages = new ActionMessages();
280        if (routeQueueForm.getMaxMessageFetcherMessages() == null || routeQueueForm.getMaxMessageFetcherMessages() <= 0) {
281            messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.invalidMessages", routeQueueForm.getMaxMessageFetcherMessages()));
282        }
283        if (!messages.isEmpty()) {
284            saveMessages(request, messages);
285            return mapping.findForward("report");
286        }
287        new MessageFetcher(routeQueueForm.getMaxMessageFetcherMessages()).run();
288        return mapping.findForward("report");
289    }
290
291    /**
292         * Sets up the expected state by retrieving the selected RouteQueue by RouteQueueId, and placing it in the
293         * ExistingRouteQueue member.
294         *
295         * Called by the super's Execute method on every request.
296         */
297    @Override
298        public ActionMessages establishRequiredState(HttpServletRequest request, ActionForm form) throws Exception {
299        request.setAttribute("rice_constant", getServlet().getServletContext().getAttribute("RiceConstants"));
300        request.setAttribute("ksb_constant", getServlet().getServletContext().getAttribute("KSBConstants"));
301        MessageQueueForm routeQueueForm = (MessageQueueForm) form;
302        routeQueueForm.setMyIpAddress(RiceUtilities.getIpNumber());
303        routeQueueForm.setMyApplicationId(CoreConfigHelper.getApplicationId());
304        routeQueueForm.setMessagePersistence(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_PERSISTENCE));
305        routeQueueForm.setMessageDelivery(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY));
306        routeQueueForm.setMessageOff(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGING_OFF));
307        List<ServiceInfo> services = KsbApiServiceLocator.getServiceRegistry().getAllOnlineServices();
308        if (routeQueueForm.getMessageId() != null) {
309            PersistedMessageBO rq = getRouteQueueService().findByRouteQueueId(routeQueueForm.getMessageId());
310            if (rq != null) {
311                routeQueueForm.setExistingQueueDate(RiceConstants.getDefaultDateFormat().format(new Date()));
312                routeQueueForm.setMessageQueueFromDatabase(rq);
313                // establish IP addresses where this message could safely be forwarded to
314                String serviceName = rq.getServiceName();
315                for (ServiceInfo serviceInfo : services) {
316                    if (serviceInfo.getServiceName().equals(serviceName)) {
317                        routeQueueForm.getIpAddresses().add(
318                                new ConcreteKeyValue(serviceInfo.getServerIpAddress(), serviceInfo.getServerIpAddress()));
319                    }
320                }
321            } else {
322                ActionMessages messages = new ActionMessages();
323                messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage(
324                        "messagequeue.RouteQueueService.queuedDocumentNotFound", routeQueueForm.getMessageId().toString()));
325                return messages;
326            }
327            routeQueueForm.setMessageId(null);
328        } else if (!"clear".equalsIgnoreCase(request.getParameter("methodToCall"))) {
329            List<PersistedMessageBO> queueEntries = findRouteQueues(request, routeQueueForm, routeQueueForm.getMaxRows() + 1);
330            if (queueEntries.size() > 0) {
331                Collections.sort(queueEntries, new Comparator() {
332                    private Comparator comp = new ComparableComparator();
333
334                    @Override
335                        public int compare(Object object1, Object object2) {
336                        if (object1 == null && object2 == null) {
337                            return 0;
338                        } else if (object1 == null) {
339                            return 1;
340                        } else if (object2 == null) {
341                            return -1;
342                        }
343                        Long id1 = ((PersistedMessageBO) object1).getRouteQueueId();
344                        Long id2 = ((PersistedMessageBO) object2).getRouteQueueId();
345
346                        try {
347                            return this.comp.compare(id1, id2);
348                        } catch (Exception e) {
349                            return 0;
350                        }
351                    }
352                });
353            }
354            routeQueueForm.setMessageQueueRows(queueEntries);
355        }
356        return null;
357    }
358
359    protected List<PersistedMessageBO> findRouteQueues(HttpServletRequest request, MessageQueueForm routeQueueForm, int maxRows) {
360        List<PersistedMessageBO> routeQueues = new ArrayList<PersistedMessageBO>();
361
362        // no filter applied
363        if (StringUtils.isBlank(routeQueueForm.getFilterApplied())) {
364            routeQueues.addAll(getRouteQueueService().findAll(maxRows));
365        }
366
367        // one or more filters applied
368        else {
369            if (!StringUtils.isBlank(routeQueueForm.getRouteQueueIdFilter())) {
370                if (!NumberUtils.isNumber(routeQueueForm.getRouteQueueIdFilter())) {
371                    // TODO better error handling here
372                    throw new RuntimeException("Message Id must be a number.");
373                }
374            }
375
376            Map<String, String> criteriaValues = new HashMap<String, String>();
377            String key = null;
378            String value = null;
379            String trimmedKey = null;
380            for (Iterator iter = request.getParameterMap().keySet().iterator(); iter.hasNext();) {
381                key = (String) iter.next();
382                if (key.endsWith(KSBConstants.ROUTE_QUEUE_FILTER_SUFFIX)) {
383                    value = request.getParameter(key);
384                    if (StringUtils.isNotBlank(value)) {
385                        trimmedKey = key.substring(0, key.indexOf(KSBConstants.ROUTE_QUEUE_FILTER_SUFFIX));
386                        criteriaValues.put(trimmedKey, value);
387                    }
388                }
389            }
390            routeQueues.addAll(getRouteQueueService().findByValues(criteriaValues, maxRows));
391        }
392        return routeQueues;
393    }
394
395    private MessageQueueService getRouteQueueService() {
396        return KSBServiceLocator.getMessageQueueService();
397    }
398
399    /**
400         * Extracts the payload from a PersistedMessageBO, attempts to convert it to the expected AsynchronousCall type, and
401         * returns it.
402         *
403         * Throws an IllegalArgumentException if the decoded payload isnt of the expected type.
404         *
405         * @param message
406         *                The populated PersistedMessageBO object to extract the payload from.
407         * @return Returns the payload if one is present and it can be deserialized, otherwise returns null.
408         */
409    protected AsynchronousCall unwrapPayload(PersistedMessageBO message) {
410        if (message == null || message.getPayload() == null) {
411            return null;
412        }
413        String encodedPayload = message.getPayload().getPayload();
414        if (StringUtils.isBlank(encodedPayload)) {
415            return null;
416        }
417        Object decodedPayload = null;
418        if (encodedPayload != null) {
419            decodedPayload = SerializationUtils.deserializeFromBase64(encodedPayload);
420        }
421        // fail fast if its not the expected type of AsynchronousCall
422        if ((decodedPayload != null) && !(decodedPayload instanceof AsynchronousCall)) {
423            throw new IllegalArgumentException("PersistedMessageBO payload was not of the expected class. " + "Expected was ["
424                    + AsynchronousCall.class.getName() + "], actual was: [" + decodedPayload.getClass().getName() + "]");
425        }
426        return (AsynchronousCall) decodedPayload;
427    }
428
429}