001 /** 002 * Copyright 2005-2012 The Kuali Foundation 003 * 004 * Licensed under the Educational Community License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl2.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 package org.kuali.rice.ksb.messaging.web; 017 018 import org.apache.commons.collections.comparators.ComparableComparator; 019 import org.apache.commons.lang.StringUtils; 020 import org.apache.commons.lang.math.NumberUtils; 021 import org.apache.struts.action.ActionForm; 022 import org.apache.struts.action.ActionForward; 023 import org.apache.struts.action.ActionMapping; 024 import org.apache.struts.action.ActionMessage; 025 import org.apache.struts.action.ActionMessages; 026 import org.kuali.rice.core.api.config.CoreConfigHelper; 027 import org.kuali.rice.core.api.config.property.ConfigContext; 028 import org.kuali.rice.core.api.util.ConcreteKeyValue; 029 import org.kuali.rice.core.api.util.RiceConstants; 030 import org.kuali.rice.core.api.util.RiceUtilities; 031 import org.kuali.rice.core.api.util.io.SerializationUtils; 032 import org.kuali.rice.ksb.api.KsbApiServiceLocator; 033 import org.kuali.rice.ksb.api.messaging.AsynchronousCall; 034 import org.kuali.rice.ksb.api.registry.ServiceInfo; 035 import org.kuali.rice.ksb.messaging.MessageFetcher; 036 import org.kuali.rice.ksb.messaging.MessageServiceInvoker; 037 import org.kuali.rice.ksb.messaging.PersistedMessageBO; 038 import org.kuali.rice.ksb.messaging.service.MessageQueueService; 039 import org.kuali.rice.ksb.service.KSBServiceLocator; 040 import org.kuali.rice.ksb.util.KSBConstants; 041 042 import javax.servlet.ServletException; 043 import javax.servlet.http.HttpServletRequest; 044 import javax.servlet.http.HttpServletResponse; 045 import java.io.IOException; 046 import java.sql.Timestamp; 047 import java.util.ArrayList; 048 import java.util.Calendar; 049 import java.util.Collections; 050 import java.util.Comparator; 051 import java.util.Date; 052 import java.util.HashMap; 053 import java.util.Iterator; 054 import java.util.List; 055 import java.util.Map; 056 057 058 /** 059 * Struts action for interacting with the queue of messages. 060 * 061 * @author Kuali Rice Team (rice.collab@kuali.org) 062 */ 063 public class MessageQueueAction extends KSBAction { 064 065 @Override 066 public ActionForward start(ActionMapping mapping, ActionForm form, HttpServletRequest request, 067 HttpServletResponse response) throws IOException, ServletException { 068 return mapping.findForward("report"); 069 } 070 071 public ActionForward save(ActionMapping mapping, ActionForm form, HttpServletRequest request, 072 HttpServletResponse response) throws Exception { 073 MessageQueueForm routeQueueForm = (MessageQueueForm) form; 074 save(routeQueueForm); 075 076 routeQueueForm.getMessageQueueFromForm().getRouteQueueId(); 077 ActionMessages messages = new ActionMessages(); 078 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.saved")); 079 saveMessages(request, messages); 080 081 // routeQueueForm.setMessageId(null); 082 //// routeQueueForm.setMessageQueueFromDatabase(null); 083 //// routeQueueForm.setMessageQueueFromForm(null); 084 // routeQueueForm.setShowEdit("yes"); 085 // routeQueueForm.setMethodToCall(""); 086 // establishRequiredState(request, form); 087 // routeQueueForm.setMessageId(routeQueueId); 088 //// routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase()); 089 // routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate()); 090 // routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(routeQueueForm.getMessageQueueFromForm())); 091 return mapping.findForward("report"); 092 } 093 094 public ActionForward saveAndResubmit(ActionMapping mapping, ActionForm form, HttpServletRequest request, 095 HttpServletResponse response) throws Exception { 096 MessageQueueForm routeQueueForm = (MessageQueueForm) form; 097 PersistedMessageBO message = save(routeQueueForm); 098 KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message)); 099 100 ActionMessages messages = new ActionMessages(); 101 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.queued")); 102 saveMessages(request, messages); 103 104 routeQueueForm.setMessageId(null); 105 routeQueueForm.setMessageQueueFromDatabase(null); 106 routeQueueForm.setMessageQueueFromForm(null); 107 routeQueueForm.setShowEdit("yes"); 108 routeQueueForm.setMethodToCall(""); 109 establishRequiredState(request, form); 110 routeQueueForm.setMessageId(message.getRouteQueueId()); 111 routeQueueForm.setMessageQueueFromForm(message); 112 routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate()); 113 routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(message)); 114 return mapping.findForward("report"); 115 } 116 117 // public ActionForward saveAndForward(ActionMapping mapping, ActionForm form, HttpServletRequest request, 118 // HttpServletResponse response) throws Exception { 119 // MessageQueueForm routeQueueForm = (MessageQueueForm) form; 120 // PersistedMessageBO message = save(routeQueueForm); 121 // ForwardedCallHandler adminService = getAdminServiceToForwardTo(message, routeQueueForm); 122 // AsynchronousCall methodCall = message.getPayload().getMethodCall(); 123 // message.setMethodCall(methodCall); 124 // adminService.handleCall(message); 125 // KSBServiceLocator.getRouteQueueService().delete(message); 126 // 127 // ActionMessages messages = new ActionMessages(); 128 // messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.queued")); 129 // saveMessages(request, messages); 130 // 131 // routeQueueForm.setMessageId(null); 132 // routeQueueForm.setMessageQueueFromDatabase(null); 133 // routeQueueForm.setMessageQueueFromForm(null); 134 // routeQueueForm.setShowEdit("yes"); 135 // routeQueueForm.setMethodToCall(""); 136 // establishRequiredState(request, form); 137 // routeQueueForm.setMessageId(message.getRouteQueueId()); 138 // routeQueueForm.setMessageQueueFromForm(message); 139 // routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate()); 140 // routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(message)); 141 // return mapping.findForward("report"); 142 // } 143 144 private PersistedMessageBO save(MessageQueueForm routeQueueForm) { 145 Long routeQueueId = routeQueueForm.getMessageQueueFromForm().getRouteQueueId(); 146 if ((routeQueueId == null) || (routeQueueId.longValue() <= 0)) { 147 throw new IllegalArgumentException("Invalid routeQueueId passed in. Cannot save"); 148 } 149 // save the message 150 PersistedMessageBO existingMessage = KSBServiceLocator.getMessageQueueService().findByRouteQueueId(routeQueueId); 151 PersistedMessageBO message = routeQueueForm.getMessageQueueFromForm(); 152 // copy the new values over 153 if (existingMessage == null) { 154 throw new RuntimeException("Could locate the existing message, it may have already been processed."); 155 } 156 157 existingMessage.setQueuePriority(message.getQueuePriority()); 158 existingMessage.setIpNumber(message.getIpNumber()); 159 existingMessage.setLockVerNbr(message.getLockVerNbr()); 160 existingMessage.setApplicationId(message.getApplicationId()); 161 existingMessage.setMethodName(message.getMethodName()); 162 existingMessage.setQueueStatus(message.getQueueStatus()); 163 existingMessage.setRetryCount(message.getRetryCount()); 164 existingMessage.setServiceName(message.getServiceName()); 165 existingMessage.setValue1(message.getValue1()); 166 existingMessage.setValue2(message.getValue2()); 167 KSBServiceLocator.getMessageQueueService().save(existingMessage); 168 return existingMessage; 169 } 170 171 /** 172 * Performs a quick ReQueue of the indicated persisted message. 173 * 174 * The net effect of this requeue is to set the Date to now, and to reset the RetryCount to zero. The payload is not 175 * modified. 176 * 177 * @param message 178 * The populated message to be requeued. 179 */ 180 protected void quickRequeueMessage(PersistedMessageBO message) { 181 message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING); 182 message.setQueueDate(new Timestamp(Calendar.getInstance().getTimeInMillis())); 183 message.setRetryCount(new Integer(0)); 184 getRouteQueueService().save(message); 185 } 186 187 public ActionForward quickRequeueMessage(ActionMapping mapping, ActionForm form, HttpServletRequest request, 188 HttpServletResponse response) throws Exception { 189 MessageQueueForm routeQueueForm = (MessageQueueForm) form; 190 if (routeQueueForm.getMessageQueueFromDatabase() == null) { 191 throw new IllegalArgumentException("No messageId passed in with the Request."); 192 } 193 194 PersistedMessageBO message = routeQueueForm.getMessageQueueFromDatabase(); 195 quickRequeueMessage(message); 196 KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message)); 197 198 ActionMessages messages = new ActionMessages(); 199 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.requeued")); 200 saveMessages(request, messages); 201 202 routeQueueForm.setMessageQueueFromDatabase(null); 203 routeQueueForm.setMessageQueueFromForm(null); 204 routeQueueForm.setMessageId(null); 205 routeQueueForm.setMethodToCall(""); 206 207 // re-run the state method to load the full set of rows 208 establishRequiredState(request, form); 209 return mapping.findForward("report"); 210 } 211 212 public ActionForward edit(ActionMapping mapping, ActionForm form, HttpServletRequest request, 213 HttpServletResponse response) throws IOException, ServletException { 214 MessageQueueForm routeQueueForm = (MessageQueueForm) form; 215 routeQueueForm.setShowEdit("yes"); 216 routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase()); 217 routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate()); 218 routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(routeQueueForm.getMessageQueueFromForm())); 219 return mapping.findForward("basic"); 220 } 221 222 public ActionForward view(ActionMapping mapping, ActionForm form, HttpServletRequest request, 223 HttpServletResponse response) throws Exception { 224 MessageQueueForm routeQueueForm = (MessageQueueForm) form; 225 routeQueueForm.setShowEdit("no"); 226 routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase()); 227 routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate()); 228 AsynchronousCall messagePayload = unwrapPayload(routeQueueForm.getMessageQueueFromDatabase()); 229 routeQueueForm.getMessageQueueFromForm().setMethodCall(messagePayload); 230 return mapping.findForward("payload"); 231 } 232 233 public ActionForward reset(ActionMapping mapping, ActionForm form, HttpServletRequest request, 234 HttpServletResponse response) throws Exception { 235 MessageQueueForm routeQueueForm = (MessageQueueForm) form; 236 if (routeQueueForm.getShowEdit().equals("yes")) { 237 routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase()); 238 } 239 return mapping.findForward("basic"); 240 } 241 242 public ActionForward clear(ActionMapping mapping, ActionForm form, HttpServletRequest request, 243 HttpServletResponse response) throws Exception { 244 MessageQueueForm routeQueueForm = (MessageQueueForm) form; 245 routeQueueForm.getMessageQueueFromForm().setQueuePriority(null); 246 routeQueueForm.getMessageQueueFromForm().setQueueStatus(null); 247 routeQueueForm.getMessageQueueFromForm().setQueueDate(null); 248 routeQueueForm.getMessageQueueFromForm().setExpirationDate(null); 249 routeQueueForm.getMessageQueueFromForm().setRetryCount(null); 250 routeQueueForm.getMessageQueueFromForm().setIpNumber(null); 251 routeQueueForm.getMessageQueueFromForm().setServiceName(null); 252 routeQueueForm.getMessageQueueFromForm().setApplicationId(null); 253 routeQueueForm.getMessageQueueFromForm().setMethodName(null); 254 routeQueueForm.getMessageQueueFromForm().setPayload(null); 255 routeQueueForm.getMessageQueueFromForm().setMethodCall(null); 256 routeQueueForm.setExistingQueueDate(null); 257 routeQueueForm.setNewQueueDate(null); 258 return mapping.findForward("basic"); 259 } 260 261 public ActionForward delete(ActionMapping mapping, ActionForm form, HttpServletRequest request, 262 HttpServletResponse response) throws Exception { 263 MessageQueueForm routeQueueForm = (MessageQueueForm) form; 264 routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase()); 265 routeQueueForm.setMessageQueueFromDatabase(null); 266 getRouteQueueService().delete(routeQueueForm.getMessageQueueFromForm()); 267 ActionMessages messages = new ActionMessages(); 268 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.deleted", routeQueueForm 269 .getMessageQueueFromForm().getRouteQueueId().toString())); 270 saveMessages(request, messages); 271 routeQueueForm.setMessageId(null); 272 establishRequiredState(request, form); 273 return mapping.findForward("report"); 274 } 275 276 public ActionForward executeMessageFetcher(ActionMapping mapping, ActionForm form, HttpServletRequest request, 277 HttpServletResponse response) throws Exception { 278 MessageQueueForm routeQueueForm = (MessageQueueForm) form; 279 ActionMessages messages = new ActionMessages(); 280 if (routeQueueForm.getMaxMessageFetcherMessages() == null || routeQueueForm.getMaxMessageFetcherMessages() <= 0) { 281 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.invalidMessages", routeQueueForm.getMaxMessageFetcherMessages())); 282 } 283 if (!messages.isEmpty()) { 284 saveMessages(request, messages); 285 return mapping.findForward("report"); 286 } 287 new MessageFetcher(routeQueueForm.getMaxMessageFetcherMessages()).run(); 288 return mapping.findForward("report"); 289 } 290 291 /** 292 * Sets up the expected state by retrieving the selected RouteQueue by RouteQueueId, and placing it in the 293 * ExistingRouteQueue member. 294 * 295 * Called by the super's Execute method on every request. 296 */ 297 @Override 298 public ActionMessages establishRequiredState(HttpServletRequest request, ActionForm form) throws Exception { 299 request.setAttribute("rice_constant", getServlet().getServletContext().getAttribute("RiceConstants")); 300 request.setAttribute("ksb_constant", getServlet().getServletContext().getAttribute("KSBConstants")); 301 MessageQueueForm routeQueueForm = (MessageQueueForm) form; 302 routeQueueForm.setMyIpAddress(RiceUtilities.getIpNumber()); 303 routeQueueForm.setMyApplicationId(CoreConfigHelper.getApplicationId()); 304 routeQueueForm.setMessagePersistence(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_PERSISTENCE)); 305 routeQueueForm.setMessageDelivery(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY)); 306 routeQueueForm.setMessageOff(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGING_OFF)); 307 List<ServiceInfo> services = KsbApiServiceLocator.getServiceRegistry().getAllOnlineServices(); 308 if (routeQueueForm.getMessageId() != null) { 309 PersistedMessageBO rq = getRouteQueueService().findByRouteQueueId(routeQueueForm.getMessageId()); 310 if (rq != null) { 311 routeQueueForm.setExistingQueueDate(RiceConstants.getDefaultDateFormat().format(new Date())); 312 routeQueueForm.setMessageQueueFromDatabase(rq); 313 // establish IP addresses where this message could safely be forwarded to 314 String serviceName = rq.getServiceName(); 315 for (ServiceInfo serviceInfo : services) { 316 if (serviceInfo.getServiceName().equals(serviceName)) { 317 routeQueueForm.getIpAddresses().add( 318 new ConcreteKeyValue(serviceInfo.getServerIpAddress(), serviceInfo.getServerIpAddress())); 319 } 320 } 321 } else { 322 ActionMessages messages = new ActionMessages(); 323 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage( 324 "messagequeue.RouteQueueService.queuedDocumentNotFound", routeQueueForm.getMessageId().toString())); 325 return messages; 326 } 327 routeQueueForm.setMessageId(null); 328 } else if (!"clear".equalsIgnoreCase(request.getParameter("methodToCall"))) { 329 List<PersistedMessageBO> queueEntries = findRouteQueues(request, routeQueueForm, routeQueueForm.getMaxRows() + 1); 330 if (queueEntries.size() > 0) { 331 Collections.sort(queueEntries, new Comparator() { 332 private Comparator comp = new ComparableComparator(); 333 334 @Override 335 public int compare(Object object1, Object object2) { 336 if (object1 == null && object2 == null) { 337 return 0; 338 } else if (object1 == null) { 339 return 1; 340 } else if (object2 == null) { 341 return -1; 342 } 343 Long id1 = ((PersistedMessageBO) object1).getRouteQueueId(); 344 Long id2 = ((PersistedMessageBO) object2).getRouteQueueId(); 345 346 try { 347 return this.comp.compare(id1, id2); 348 } catch (Exception e) { 349 return 0; 350 } 351 } 352 }); 353 } 354 routeQueueForm.setMessageQueueRows(queueEntries); 355 } 356 return null; 357 } 358 359 protected List<PersistedMessageBO> findRouteQueues(HttpServletRequest request, MessageQueueForm routeQueueForm, int maxRows) { 360 List<PersistedMessageBO> routeQueues = new ArrayList<PersistedMessageBO>(); 361 362 // no filter applied 363 if (StringUtils.isBlank(routeQueueForm.getFilterApplied())) { 364 routeQueues.addAll(getRouteQueueService().findAll(maxRows)); 365 } 366 367 // one or more filters applied 368 else { 369 if (!StringUtils.isBlank(routeQueueForm.getRouteQueueIdFilter())) { 370 if (!NumberUtils.isNumber(routeQueueForm.getRouteQueueIdFilter())) { 371 // TODO better error handling here 372 throw new RuntimeException("Message Id must be a number."); 373 } 374 } 375 376 Map<String, String> criteriaValues = new HashMap<String, String>(); 377 String key = null; 378 String value = null; 379 String trimmedKey = null; 380 for (Iterator iter = request.getParameterMap().keySet().iterator(); iter.hasNext();) { 381 key = (String) iter.next(); 382 if (key.endsWith(KSBConstants.ROUTE_QUEUE_FILTER_SUFFIX)) { 383 value = request.getParameter(key); 384 if (StringUtils.isNotBlank(value)) { 385 trimmedKey = key.substring(0, key.indexOf(KSBConstants.ROUTE_QUEUE_FILTER_SUFFIX)); 386 criteriaValues.put(trimmedKey, value); 387 } 388 } 389 } 390 routeQueues.addAll(getRouteQueueService().findByValues(criteriaValues, maxRows)); 391 } 392 return routeQueues; 393 } 394 395 private MessageQueueService getRouteQueueService() { 396 return KSBServiceLocator.getMessageQueueService(); 397 } 398 399 /** 400 * Extracts the payload from a PersistedMessageBO, attempts to convert it to the expected AsynchronousCall type, and 401 * returns it. 402 * 403 * Throws an IllegalArgumentException if the decoded payload isnt of the expected type. 404 * 405 * @param message 406 * The populated PersistedMessageBO object to extract the payload from. 407 * @return Returns the payload if one is present and it can be deserialized, otherwise returns null. 408 */ 409 protected AsynchronousCall unwrapPayload(PersistedMessageBO message) { 410 if (message == null || message.getPayload() == null) { 411 return null; 412 } 413 String encodedPayload = message.getPayload().getPayload(); 414 if (StringUtils.isBlank(encodedPayload)) { 415 return null; 416 } 417 Object decodedPayload = null; 418 if (encodedPayload != null) { 419 decodedPayload = SerializationUtils.deserializeFromBase64(encodedPayload); 420 } 421 // fail fast if its not the expected type of AsynchronousCall 422 if ((decodedPayload != null) && !(decodedPayload instanceof AsynchronousCall)) { 423 throw new IllegalArgumentException("PersistedMessageBO payload was not of the expected class. " + "Expected was [" 424 + AsynchronousCall.class.getName() + "], actual was: [" + decodedPayload.getClass().getName() + "]"); 425 } 426 return (AsynchronousCall) decodedPayload; 427 } 428 429 }