001 /**
002 * Copyright 2005-2012 The Kuali Foundation
003 *
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/ecl2.php
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package org.kuali.rice.ksb.messaging.web;
017
018 import org.apache.commons.collections.comparators.ComparableComparator;
019 import org.apache.commons.lang.StringUtils;
020 import org.apache.commons.lang.math.NumberUtils;
021 import org.apache.struts.action.ActionForm;
022 import org.apache.struts.action.ActionForward;
023 import org.apache.struts.action.ActionMapping;
024 import org.apache.struts.action.ActionMessage;
025 import org.apache.struts.action.ActionMessages;
026 import org.kuali.rice.core.api.config.CoreConfigHelper;
027 import org.kuali.rice.core.api.config.property.ConfigContext;
028 import org.kuali.rice.core.api.util.ConcreteKeyValue;
029 import org.kuali.rice.core.api.util.RiceConstants;
030 import org.kuali.rice.core.api.util.RiceUtilities;
031 import org.kuali.rice.core.api.util.io.SerializationUtils;
032 import org.kuali.rice.ksb.api.KsbApiServiceLocator;
033 import org.kuali.rice.ksb.api.messaging.AsynchronousCall;
034 import org.kuali.rice.ksb.api.registry.ServiceInfo;
035 import org.kuali.rice.ksb.messaging.MessageFetcher;
036 import org.kuali.rice.ksb.messaging.MessageServiceInvoker;
037 import org.kuali.rice.ksb.messaging.PersistedMessageBO;
038 import org.kuali.rice.ksb.messaging.service.MessageQueueService;
039 import org.kuali.rice.ksb.service.KSBServiceLocator;
040 import org.kuali.rice.ksb.util.KSBConstants;
041
042 import javax.servlet.ServletException;
043 import javax.servlet.http.HttpServletRequest;
044 import javax.servlet.http.HttpServletResponse;
045 import java.io.IOException;
046 import java.sql.Timestamp;
047 import java.util.ArrayList;
048 import java.util.Calendar;
049 import java.util.Collections;
050 import java.util.Comparator;
051 import java.util.Date;
052 import java.util.HashMap;
053 import java.util.Iterator;
054 import java.util.List;
055 import java.util.Map;
056
057
058 /**
059 * Struts action for interacting with the queue of messages.
060 *
061 * @author Kuali Rice Team (rice.collab@kuali.org)
062 */
063 public class MessageQueueAction extends KSBAction {
064
065 @Override
066 public ActionForward start(ActionMapping mapping, ActionForm form, HttpServletRequest request,
067 HttpServletResponse response) throws IOException, ServletException {
068 return mapping.findForward("report");
069 }
070
071 public ActionForward save(ActionMapping mapping, ActionForm form, HttpServletRequest request,
072 HttpServletResponse response) throws Exception {
073 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
074 save(routeQueueForm);
075
076 routeQueueForm.getMessageQueueFromForm().getRouteQueueId();
077 ActionMessages messages = new ActionMessages();
078 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.saved"));
079 saveMessages(request, messages);
080
081 // routeQueueForm.setMessageId(null);
082 //// routeQueueForm.setMessageQueueFromDatabase(null);
083 //// routeQueueForm.setMessageQueueFromForm(null);
084 // routeQueueForm.setShowEdit("yes");
085 // routeQueueForm.setMethodToCall("");
086 // establishRequiredState(request, form);
087 // routeQueueForm.setMessageId(routeQueueId);
088 //// routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
089 // routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
090 // routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(routeQueueForm.getMessageQueueFromForm()));
091 return mapping.findForward("report");
092 }
093
094 public ActionForward saveAndResubmit(ActionMapping mapping, ActionForm form, HttpServletRequest request,
095 HttpServletResponse response) throws Exception {
096 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
097 PersistedMessageBO message = save(routeQueueForm);
098 KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
099
100 ActionMessages messages = new ActionMessages();
101 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.queued"));
102 saveMessages(request, messages);
103
104 routeQueueForm.setMessageId(null);
105 routeQueueForm.setMessageQueueFromDatabase(null);
106 routeQueueForm.setMessageQueueFromForm(null);
107 routeQueueForm.setShowEdit("yes");
108 routeQueueForm.setMethodToCall("");
109 establishRequiredState(request, form);
110 routeQueueForm.setMessageId(message.getRouteQueueId());
111 routeQueueForm.setMessageQueueFromForm(message);
112 routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
113 routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(message));
114 return mapping.findForward("report");
115 }
116
117 // public ActionForward saveAndForward(ActionMapping mapping, ActionForm form, HttpServletRequest request,
118 // HttpServletResponse response) throws Exception {
119 // MessageQueueForm routeQueueForm = (MessageQueueForm) form;
120 // PersistedMessageBO message = save(routeQueueForm);
121 // ForwardedCallHandler adminService = getAdminServiceToForwardTo(message, routeQueueForm);
122 // AsynchronousCall methodCall = message.getPayload().getMethodCall();
123 // message.setMethodCall(methodCall);
124 // adminService.handleCall(message);
125 // KSBServiceLocator.getRouteQueueService().delete(message);
126 //
127 // ActionMessages messages = new ActionMessages();
128 // messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.queued"));
129 // saveMessages(request, messages);
130 //
131 // routeQueueForm.setMessageId(null);
132 // routeQueueForm.setMessageQueueFromDatabase(null);
133 // routeQueueForm.setMessageQueueFromForm(null);
134 // routeQueueForm.setShowEdit("yes");
135 // routeQueueForm.setMethodToCall("");
136 // establishRequiredState(request, form);
137 // routeQueueForm.setMessageId(message.getRouteQueueId());
138 // routeQueueForm.setMessageQueueFromForm(message);
139 // routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
140 // routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(message));
141 // return mapping.findForward("report");
142 // }
143
144 private PersistedMessageBO save(MessageQueueForm routeQueueForm) {
145 Long routeQueueId = routeQueueForm.getMessageQueueFromForm().getRouteQueueId();
146 if ((routeQueueId == null) || (routeQueueId.longValue() <= 0)) {
147 throw new IllegalArgumentException("Invalid routeQueueId passed in. Cannot save");
148 }
149 // save the message
150 PersistedMessageBO existingMessage = KSBServiceLocator.getMessageQueueService().findByRouteQueueId(routeQueueId);
151 PersistedMessageBO message = routeQueueForm.getMessageQueueFromForm();
152 // copy the new values over
153 if (existingMessage == null) {
154 throw new RuntimeException("Could locate the existing message, it may have already been processed.");
155 }
156
157 existingMessage.setQueuePriority(message.getQueuePriority());
158 existingMessage.setIpNumber(message.getIpNumber());
159 existingMessage.setLockVerNbr(message.getLockVerNbr());
160 existingMessage.setApplicationId(message.getApplicationId());
161 existingMessage.setMethodName(message.getMethodName());
162 existingMessage.setQueueStatus(message.getQueueStatus());
163 existingMessage.setRetryCount(message.getRetryCount());
164 existingMessage.setServiceName(message.getServiceName());
165 existingMessage.setValue1(message.getValue1());
166 existingMessage.setValue2(message.getValue2());
167 KSBServiceLocator.getMessageQueueService().save(existingMessage);
168 return existingMessage;
169 }
170
171 /**
172 * Performs a quick ReQueue of the indicated persisted message.
173 *
174 * The net effect of this requeue is to set the Date to now, and to reset the RetryCount to zero. The payload is not
175 * modified.
176 *
177 * @param message
178 * The populated message to be requeued.
179 */
180 protected void quickRequeueMessage(PersistedMessageBO message) {
181 message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING);
182 message.setQueueDate(new Timestamp(Calendar.getInstance().getTimeInMillis()));
183 message.setRetryCount(new Integer(0));
184 getRouteQueueService().save(message);
185 }
186
187 public ActionForward quickRequeueMessage(ActionMapping mapping, ActionForm form, HttpServletRequest request,
188 HttpServletResponse response) throws Exception {
189 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
190 if (routeQueueForm.getMessageQueueFromDatabase() == null) {
191 throw new IllegalArgumentException("No messageId passed in with the Request.");
192 }
193
194 PersistedMessageBO message = routeQueueForm.getMessageQueueFromDatabase();
195 quickRequeueMessage(message);
196 KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
197
198 ActionMessages messages = new ActionMessages();
199 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.requeued"));
200 saveMessages(request, messages);
201
202 routeQueueForm.setMessageQueueFromDatabase(null);
203 routeQueueForm.setMessageQueueFromForm(null);
204 routeQueueForm.setMessageId(null);
205 routeQueueForm.setMethodToCall("");
206
207 // re-run the state method to load the full set of rows
208 establishRequiredState(request, form);
209 return mapping.findForward("report");
210 }
211
212 public ActionForward edit(ActionMapping mapping, ActionForm form, HttpServletRequest request,
213 HttpServletResponse response) throws IOException, ServletException {
214 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
215 routeQueueForm.setShowEdit("yes");
216 routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
217 routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
218 routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(routeQueueForm.getMessageQueueFromForm()));
219 return mapping.findForward("basic");
220 }
221
222 public ActionForward view(ActionMapping mapping, ActionForm form, HttpServletRequest request,
223 HttpServletResponse response) throws Exception {
224 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
225 routeQueueForm.setShowEdit("no");
226 routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
227 routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
228 AsynchronousCall messagePayload = unwrapPayload(routeQueueForm.getMessageQueueFromDatabase());
229 routeQueueForm.getMessageQueueFromForm().setMethodCall(messagePayload);
230 return mapping.findForward("payload");
231 }
232
233 public ActionForward reset(ActionMapping mapping, ActionForm form, HttpServletRequest request,
234 HttpServletResponse response) throws Exception {
235 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
236 if (routeQueueForm.getShowEdit().equals("yes")) {
237 routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
238 }
239 return mapping.findForward("basic");
240 }
241
242 public ActionForward clear(ActionMapping mapping, ActionForm form, HttpServletRequest request,
243 HttpServletResponse response) throws Exception {
244 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
245 routeQueueForm.getMessageQueueFromForm().setQueuePriority(null);
246 routeQueueForm.getMessageQueueFromForm().setQueueStatus(null);
247 routeQueueForm.getMessageQueueFromForm().setQueueDate(null);
248 routeQueueForm.getMessageQueueFromForm().setExpirationDate(null);
249 routeQueueForm.getMessageQueueFromForm().setRetryCount(null);
250 routeQueueForm.getMessageQueueFromForm().setIpNumber(null);
251 routeQueueForm.getMessageQueueFromForm().setServiceName(null);
252 routeQueueForm.getMessageQueueFromForm().setApplicationId(null);
253 routeQueueForm.getMessageQueueFromForm().setMethodName(null);
254 routeQueueForm.getMessageQueueFromForm().setPayload(null);
255 routeQueueForm.getMessageQueueFromForm().setMethodCall(null);
256 routeQueueForm.setExistingQueueDate(null);
257 routeQueueForm.setNewQueueDate(null);
258 return mapping.findForward("basic");
259 }
260
261 public ActionForward delete(ActionMapping mapping, ActionForm form, HttpServletRequest request,
262 HttpServletResponse response) throws Exception {
263 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
264 routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
265 routeQueueForm.setMessageQueueFromDatabase(null);
266 getRouteQueueService().delete(routeQueueForm.getMessageQueueFromForm());
267 ActionMessages messages = new ActionMessages();
268 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.deleted", routeQueueForm
269 .getMessageQueueFromForm().getRouteQueueId().toString()));
270 saveMessages(request, messages);
271 routeQueueForm.setMessageId(null);
272 establishRequiredState(request, form);
273 return mapping.findForward("report");
274 }
275
276 public ActionForward executeMessageFetcher(ActionMapping mapping, ActionForm form, HttpServletRequest request,
277 HttpServletResponse response) throws Exception {
278 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
279 ActionMessages messages = new ActionMessages();
280 if (routeQueueForm.getMaxMessageFetcherMessages() == null || routeQueueForm.getMaxMessageFetcherMessages() <= 0) {
281 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.invalidMessages", routeQueueForm.getMaxMessageFetcherMessages()));
282 }
283 if (!messages.isEmpty()) {
284 saveMessages(request, messages);
285 return mapping.findForward("report");
286 }
287 new MessageFetcher(routeQueueForm.getMaxMessageFetcherMessages()).run();
288 return mapping.findForward("report");
289 }
290
291 /**
292 * Sets up the expected state by retrieving the selected RouteQueue by RouteQueueId, and placing it in the
293 * ExistingRouteQueue member.
294 *
295 * Called by the super's Execute method on every request.
296 */
297 @Override
298 public ActionMessages establishRequiredState(HttpServletRequest request, ActionForm form) throws Exception {
299 request.setAttribute("rice_constant", getServlet().getServletContext().getAttribute("RiceConstants"));
300 request.setAttribute("ksb_constant", getServlet().getServletContext().getAttribute("KSBConstants"));
301 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
302 routeQueueForm.setMyIpAddress(RiceUtilities.getIpNumber());
303 routeQueueForm.setMyApplicationId(CoreConfigHelper.getApplicationId());
304 routeQueueForm.setMessagePersistence(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_PERSISTENCE));
305 routeQueueForm.setMessageDelivery(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY));
306 routeQueueForm.setMessageOff(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGING_OFF));
307 List<ServiceInfo> services = KsbApiServiceLocator.getServiceRegistry().getAllOnlineServices();
308 if (routeQueueForm.getMessageId() != null) {
309 PersistedMessageBO rq = getRouteQueueService().findByRouteQueueId(routeQueueForm.getMessageId());
310 if (rq != null) {
311 routeQueueForm.setExistingQueueDate(RiceConstants.getDefaultDateFormat().format(new Date()));
312 routeQueueForm.setMessageQueueFromDatabase(rq);
313 // establish IP addresses where this message could safely be forwarded to
314 String serviceName = rq.getServiceName();
315 for (ServiceInfo serviceInfo : services) {
316 if (serviceInfo.getServiceName().equals(serviceName)) {
317 routeQueueForm.getIpAddresses().add(
318 new ConcreteKeyValue(serviceInfo.getServerIpAddress(), serviceInfo.getServerIpAddress()));
319 }
320 }
321 } else {
322 ActionMessages messages = new ActionMessages();
323 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage(
324 "messagequeue.RouteQueueService.queuedDocumentNotFound", routeQueueForm.getMessageId().toString()));
325 return messages;
326 }
327 routeQueueForm.setMessageId(null);
328 } else if (!"clear".equalsIgnoreCase(request.getParameter("methodToCall"))) {
329 List<PersistedMessageBO> queueEntries = findRouteQueues(request, routeQueueForm, routeQueueForm.getMaxRows() + 1);
330 if (queueEntries.size() > 0) {
331 Collections.sort(queueEntries, new Comparator() {
332 private Comparator comp = new ComparableComparator();
333
334 @Override
335 public int compare(Object object1, Object object2) {
336 if (object1 == null && object2 == null) {
337 return 0;
338 } else if (object1 == null) {
339 return 1;
340 } else if (object2 == null) {
341 return -1;
342 }
343 Long id1 = ((PersistedMessageBO) object1).getRouteQueueId();
344 Long id2 = ((PersistedMessageBO) object2).getRouteQueueId();
345
346 try {
347 return this.comp.compare(id1, id2);
348 } catch (Exception e) {
349 return 0;
350 }
351 }
352 });
353 }
354 routeQueueForm.setMessageQueueRows(queueEntries);
355 }
356 return null;
357 }
358
359 protected List<PersistedMessageBO> findRouteQueues(HttpServletRequest request, MessageQueueForm routeQueueForm, int maxRows) {
360 List<PersistedMessageBO> routeQueues = new ArrayList<PersistedMessageBO>();
361
362 // no filter applied
363 if (StringUtils.isBlank(routeQueueForm.getFilterApplied())) {
364 routeQueues.addAll(getRouteQueueService().findAll(maxRows));
365 }
366
367 // one or more filters applied
368 else {
369 if (!StringUtils.isBlank(routeQueueForm.getRouteQueueIdFilter())) {
370 if (!NumberUtils.isNumber(routeQueueForm.getRouteQueueIdFilter())) {
371 // TODO better error handling here
372 throw new RuntimeException("Message Id must be a number.");
373 }
374 }
375
376 Map<String, String> criteriaValues = new HashMap<String, String>();
377 String key = null;
378 String value = null;
379 String trimmedKey = null;
380 for (Iterator iter = request.getParameterMap().keySet().iterator(); iter.hasNext();) {
381 key = (String) iter.next();
382 if (key.endsWith(KSBConstants.ROUTE_QUEUE_FILTER_SUFFIX)) {
383 value = request.getParameter(key);
384 if (StringUtils.isNotBlank(value)) {
385 trimmedKey = key.substring(0, key.indexOf(KSBConstants.ROUTE_QUEUE_FILTER_SUFFIX));
386 criteriaValues.put(trimmedKey, value);
387 }
388 }
389 }
390 routeQueues.addAll(getRouteQueueService().findByValues(criteriaValues, maxRows));
391 }
392 return routeQueues;
393 }
394
395 private MessageQueueService getRouteQueueService() {
396 return KSBServiceLocator.getMessageQueueService();
397 }
398
399 /**
400 * Extracts the payload from a PersistedMessageBO, attempts to convert it to the expected AsynchronousCall type, and
401 * returns it.
402 *
403 * Throws an IllegalArgumentException if the decoded payload isnt of the expected type.
404 *
405 * @param message
406 * The populated PersistedMessageBO object to extract the payload from.
407 * @return Returns the payload if one is present and it can be deserialized, otherwise returns null.
408 */
409 protected AsynchronousCall unwrapPayload(PersistedMessageBO message) {
410 if (message == null || message.getPayload() == null) {
411 return null;
412 }
413 String encodedPayload = message.getPayload().getPayload();
414 if (StringUtils.isBlank(encodedPayload)) {
415 return null;
416 }
417 Object decodedPayload = null;
418 if (encodedPayload != null) {
419 decodedPayload = SerializationUtils.deserializeFromBase64(encodedPayload);
420 }
421 // fail fast if its not the expected type of AsynchronousCall
422 if ((decodedPayload != null) && !(decodedPayload instanceof AsynchronousCall)) {
423 throw new IllegalArgumentException("PersistedMessageBO payload was not of the expected class. " + "Expected was ["
424 + AsynchronousCall.class.getName() + "], actual was: [" + decodedPayload.getClass().getName() + "]");
425 }
426 return (AsynchronousCall) decodedPayload;
427 }
428
429 }