View Javadoc
1   /**
2    * Copyright 2005-2016 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.ksb.messaging.web;
17  
18  import org.apache.commons.collections.comparators.ComparableComparator;
19  import org.apache.commons.lang.StringUtils;
20  import org.apache.commons.lang.math.NumberUtils;
21  import org.apache.struts.action.ActionForm;
22  import org.apache.struts.action.ActionForward;
23  import org.apache.struts.action.ActionMapping;
24  import org.apache.struts.action.ActionMessage;
25  import org.apache.struts.action.ActionMessages;
26  import org.kuali.rice.core.api.config.CoreConfigHelper;
27  import org.kuali.rice.core.api.config.property.ConfigContext;
28  import org.kuali.rice.core.api.util.ConcreteKeyValue;
29  import org.kuali.rice.core.api.util.RiceConstants;
30  import org.kuali.rice.core.api.util.RiceUtilities;
31  import org.kuali.rice.core.api.util.io.SerializationUtils;
32  import org.kuali.rice.ksb.api.KsbApiServiceLocator;
33  import org.kuali.rice.ksb.api.messaging.AsynchronousCall;
34  import org.kuali.rice.ksb.api.registry.ServiceInfo;
35  import org.kuali.rice.ksb.messaging.MessageFetcher;
36  import org.kuali.rice.ksb.messaging.MessageServiceInvoker;
37  import org.kuali.rice.ksb.messaging.PersistedMessageBO;
38  import org.kuali.rice.ksb.messaging.service.MessageQueueService;
39  import org.kuali.rice.ksb.service.KSBServiceLocator;
40  import org.kuali.rice.ksb.util.KSBConstants;
41  
42  import javax.servlet.ServletException;
43  import javax.servlet.http.HttpServletRequest;
44  import javax.servlet.http.HttpServletResponse;
45  import java.io.IOException;
46  import java.sql.Timestamp;
47  import java.util.ArrayList;
48  import java.util.Calendar;
49  import java.util.Collections;
50  import java.util.Comparator;
51  import java.util.Date;
52  import java.util.HashMap;
53  import java.util.Iterator;
54  import java.util.List;
55  import java.util.Map;
56  
57  
58  /**
59   * Struts action for interacting with the queue of messages.
60   *
61   * @author Kuali Rice Team (rice.collab@kuali.org)
62   */
63  public class MessageQueueAction extends KSBAction {
64  
65      @Override
66  	public ActionForward start(ActionMapping mapping, ActionForm form, HttpServletRequest request,
67  	    HttpServletResponse response) throws IOException, ServletException {
68  	return mapping.findForward("report");
69      }
70  
71      public ActionForward save(ActionMapping mapping, ActionForm form, HttpServletRequest request,
72  	    HttpServletResponse response) throws Exception {
73  	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
74  	save(routeQueueForm);
75  
76  	routeQueueForm.getMessageQueueFromForm().getRouteQueueId();
77  	ActionMessages messages = new ActionMessages();
78  	messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.saved"));
79  	saveMessages(request, messages);
80  
81  //	routeQueueForm.setMessageId(null);
82  ////	routeQueueForm.setMessageQueueFromDatabase(null);
83  ////	routeQueueForm.setMessageQueueFromForm(null);
84  //	routeQueueForm.setShowEdit("yes");
85  //	routeQueueForm.setMethodToCall("");
86  //	establishRequiredState(request, form);
87  //	routeQueueForm.setMessageId(routeQueueId);
88  ////	routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
89  //	routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
90  //	routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(routeQueueForm.getMessageQueueFromForm()));
91  	return mapping.findForward("report");
92      }
93  
94      public ActionForward saveAndResubmit(ActionMapping mapping, ActionForm form, HttpServletRequest request,
95  	    HttpServletResponse response) throws Exception {
96  	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
97  	PersistedMessageBO message = save(routeQueueForm);
98  	KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
99  
100 	ActionMessages messages = new ActionMessages();
101 	messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.queued"));
102 	saveMessages(request, messages);
103 
104 	routeQueueForm.setMessageId(null);
105 	routeQueueForm.setMessageQueueFromDatabase(null);
106 	routeQueueForm.setMessageQueueFromForm(null);
107 	routeQueueForm.setShowEdit("yes");
108 	routeQueueForm.setMethodToCall("");
109 	establishRequiredState(request, form);
110 	routeQueueForm.setMessageId(message.getRouteQueueId());
111 	routeQueueForm.setMessageQueueFromForm(message);
112 	routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
113 	routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(message));
114 	return mapping.findForward("report");
115     }
116 
117 //    public ActionForward saveAndForward(ActionMapping mapping, ActionForm form, HttpServletRequest request,
118 //	    HttpServletResponse response) throws Exception {
119 //	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
120 //	PersistedMessageBO message = save(routeQueueForm);
121 //	ForwardedCallHandler adminService = getAdminServiceToForwardTo(message, routeQueueForm);
122 //	AsynchronousCall methodCall = message.getPayload().getMethodCall();
123 //	message.setMethodCall(methodCall);
124 //	adminService.handleCall(message);
125 //	KSBServiceLocator.getRouteQueueService().delete(message);
126 //
127 //	ActionMessages messages = new ActionMessages();
128 //	messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.queued"));
129 //	saveMessages(request, messages);
130 //
131 //	routeQueueForm.setMessageId(null);
132 //	routeQueueForm.setMessageQueueFromDatabase(null);
133 //	routeQueueForm.setMessageQueueFromForm(null);
134 //	routeQueueForm.setShowEdit("yes");
135 //	routeQueueForm.setMethodToCall("");
136 //	establishRequiredState(request, form);
137 //	routeQueueForm.setMessageId(message.getRouteQueueId());
138 //	routeQueueForm.setMessageQueueFromForm(message);
139 //	routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
140 //	routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(message));
141 //	return mapping.findForward("report");
142 //    }
143 
144     private PersistedMessageBO save(MessageQueueForm routeQueueForm) {
145 	Long routeQueueId = routeQueueForm.getMessageQueueFromForm().getRouteQueueId();
146 	if ((routeQueueId == null) || (routeQueueId.longValue() <= 0)) {
147 	    throw new IllegalArgumentException("Invalid routeQueueId passed in.  Cannot save");
148 	}
149 	// save the message
150 	PersistedMessageBO existingMessage = KSBServiceLocator.getMessageQueueService().findByRouteQueueId(routeQueueId);
151 	PersistedMessageBO message = routeQueueForm.getMessageQueueFromForm();
152 	// copy the new values over
153 	if (existingMessage == null) {
154 	    throw new RuntimeException("Could locate the existing message, it may have already been processed.");
155 	}
156 
157 	existingMessage.setQueuePriority(message.getQueuePriority());
158 	existingMessage.setIpNumber(message.getIpNumber());
159 	existingMessage.setLockVerNbr(message.getLockVerNbr());
160 	existingMessage.setApplicationId(message.getApplicationId());
161 	existingMessage.setMethodName(message.getMethodName());
162 	existingMessage.setQueueStatus(message.getQueueStatus());
163 	existingMessage.setRetryCount(message.getRetryCount());
164 	existingMessage.setServiceName(message.getServiceName());
165 	existingMessage.setValue1(message.getValue1());
166 	existingMessage.setValue2(message.getValue2());
167 	existingMessage = KSBServiceLocator.getMessageQueueService().save(existingMessage);
168 	return existingMessage;
169     }
170 
171     /**
172          * Performs a quick ReQueue of the indicated persisted message.
173          *
174          * The net effect of this requeue is to set the Date to now, and to reset the RetryCount to zero. The payload is not
175          * modified.
176          *
177          * @param message
178          *                The populated message to be requeued.
179          */
180     protected PersistedMessageBO quickRequeueMessage(PersistedMessageBO message) {
181 	message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING);
182 	message.setQueueDate(new Timestamp(Calendar.getInstance().getTimeInMillis()));
183 	message.setRetryCount(new Integer(0));
184 	return getRouteQueueService().save(message);
185     }
186 
187     public ActionForward quickRequeueMessage(ActionMapping mapping, ActionForm form, HttpServletRequest request,
188 	    HttpServletResponse response) throws Exception {
189 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
190 	if (routeQueueForm.getMessageQueueFromDatabase() == null) {
191 	    throw new IllegalArgumentException("No messageId passed in with the Request.");
192 	}
193 
194 	PersistedMessageBO message = routeQueueForm.getMessageQueueFromDatabase();
195 	message = quickRequeueMessage(message);
196 	KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
197 
198 	ActionMessages messages = new ActionMessages();
199 	messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.requeued"));
200 	saveMessages(request, messages);
201 
202 	routeQueueForm.setMessageQueueFromDatabase(null);
203 	routeQueueForm.setMessageQueueFromForm(null);
204 	routeQueueForm.setMessageId(null);
205 	routeQueueForm.setMethodToCall("");
206 
207 	// re-run the state method to load the full set of rows
208 	establishRequiredState(request, form);
209 	return mapping.findForward("report");
210     }
211 
212     public ActionForward edit(ActionMapping mapping, ActionForm form, HttpServletRequest request,
213 	    HttpServletResponse response) throws IOException, ServletException {
214 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
215 	routeQueueForm.setShowEdit("yes");
216 	routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
217 	routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
218 	routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(routeQueueForm.getMessageQueueFromForm()));
219 	return mapping.findForward("basic");
220     }
221 
222     public ActionForward view(ActionMapping mapping, ActionForm form, HttpServletRequest request,
223 	    HttpServletResponse response) throws Exception {
224 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
225 	routeQueueForm.setShowEdit("no");
226 	routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
227 	routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
228 	AsynchronousCall messagePayload = unwrapPayload(routeQueueForm.getMessageQueueFromDatabase());
229 	routeQueueForm.getMessageQueueFromForm().setMethodCall(messagePayload);
230 	return mapping.findForward("payload");
231     }
232 
233     public ActionForward reset(ActionMapping mapping, ActionForm form, HttpServletRequest request,
234 	    HttpServletResponse response) throws Exception {
235 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
236 	if (routeQueueForm.getShowEdit().equals("yes")) {
237 	    routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
238 	}
239 	return mapping.findForward("basic");
240     }
241 
242     public ActionForward clear(ActionMapping mapping, ActionForm form, HttpServletRequest request,
243 	    HttpServletResponse response) throws Exception {
244 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
245 	routeQueueForm.getMessageQueueFromForm().setQueuePriority(null);
246 	routeQueueForm.getMessageQueueFromForm().setQueueStatus(null);
247 	routeQueueForm.getMessageQueueFromForm().setQueueDate(null);
248 	routeQueueForm.getMessageQueueFromForm().setExpirationDate(null);
249 	routeQueueForm.getMessageQueueFromForm().setRetryCount(null);
250 	routeQueueForm.getMessageQueueFromForm().setIpNumber(null);
251 	routeQueueForm.getMessageQueueFromForm().setServiceName(null);
252 	routeQueueForm.getMessageQueueFromForm().setApplicationId(null);
253 	routeQueueForm.getMessageQueueFromForm().setMethodName(null);
254 	routeQueueForm.getMessageQueueFromForm().setPayload(null);
255 	routeQueueForm.getMessageQueueFromForm().setMethodCall(null);
256 	routeQueueForm.setExistingQueueDate(null);
257 	routeQueueForm.setNewQueueDate(null);
258 	return mapping.findForward("basic");
259     }
260 
261     public ActionForward delete(ActionMapping mapping, ActionForm form, HttpServletRequest request,
262 	    HttpServletResponse response) throws Exception {
263 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
264 	routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
265 	routeQueueForm.setMessageQueueFromDatabase(null);
266 	getRouteQueueService().delete(routeQueueForm.getMessageQueueFromForm());
267 	ActionMessages messages = new ActionMessages();
268 	messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.deleted", routeQueueForm
269 		.getMessageQueueFromForm().getRouteQueueId().toString()));
270 	saveMessages(request, messages);
271 	routeQueueForm.setMessageId(null);
272 	establishRequiredState(request, form);
273 	return mapping.findForward("report");
274     }
275 
276     public ActionForward executeMessageFetcher(ActionMapping mapping,  ActionForm form, HttpServletRequest request,
277 	    HttpServletResponse response) throws Exception {
278 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
279 	ActionMessages messages = new ActionMessages();
280 	if (routeQueueForm.getMaxMessageFetcherMessages() == null || routeQueueForm.getMaxMessageFetcherMessages() <= 0) {
281 	    messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.invalidMessages", routeQueueForm.getMaxMessageFetcherMessages()));
282 	}
283 	if (!messages.isEmpty()) {
284 	    saveMessages(request, messages);
285 	    return mapping.findForward("report");
286 	}
287 	new MessageFetcher(routeQueueForm.getMaxMessageFetcherMessages()).run();
288 	return mapping.findForward("report");
289     }
290 
291     /**
292          * Sets up the expected state by retrieving the selected RouteQueue by RouteQueueId, and placing it in the
293          * ExistingRouteQueue member.
294          *
295          * Called by the super's Execute method on every request.
296          */
297     @Override
298 	public ActionMessages establishRequiredState(HttpServletRequest request, ActionForm form) throws Exception {
299 	request.setAttribute("rice_constant", getServlet().getServletContext().getAttribute("RiceConstants"));
300 	request.setAttribute("ksb_constant", getServlet().getServletContext().getAttribute("KSBConstants"));
301 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
302 	routeQueueForm.setMyIpAddress(RiceUtilities.getIpNumber());
303 	routeQueueForm.setMyApplicationId(CoreConfigHelper.getApplicationId());
304 	routeQueueForm.setMessagePersistence(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_PERSISTENCE));
305 	routeQueueForm.setMessageDelivery(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY));
306 	routeQueueForm.setMessageOff(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGING_OFF));
307 	List<ServiceInfo> services = KsbApiServiceLocator.getServiceRegistry().getAllOnlineServices();
308 	if (routeQueueForm.getMessageId() != null) {
309 	    PersistedMessageBO rq = getRouteQueueService().findByRouteQueueId(routeQueueForm.getMessageId());
310 	    if (rq != null) {
311 		routeQueueForm.setExistingQueueDate(RiceConstants.getDefaultDateFormat().format(new Date()));
312 		routeQueueForm.setMessageQueueFromDatabase(rq);
313 		// establish IP addresses where this message could safely be forwarded to
314 		String serviceName = rq.getServiceName();
315 		for (ServiceInfo serviceInfo : services) {
316 		    if (serviceInfo.getServiceName().equals(serviceName)) {
317 			routeQueueForm.getIpAddresses().add(
318 				new ConcreteKeyValue(serviceInfo.getServerIpAddress(), serviceInfo.getServerIpAddress()));
319 		    }
320 		}
321 	    } else {
322 		ActionMessages messages = new ActionMessages();
323 		messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage(
324 			"messagequeue.RouteQueueService.queuedDocumentNotFound", routeQueueForm.getMessageId().toString()));
325 		return messages;
326 	    }
327 	    routeQueueForm.setMessageId(null);
328 	} else if (!"clear".equalsIgnoreCase(request.getParameter("methodToCall"))) {
329 	    List<PersistedMessageBO> queueEntries = findRouteQueues(request, routeQueueForm, routeQueueForm.getMaxRows() + 1);
330 	    if (queueEntries.size() > 0) {
331 		Collections.sort(queueEntries, new Comparator() {
332 		    private Comparator comp = new ComparableComparator();
333 
334 		    @Override
335 			public int compare(Object object1, Object object2) {
336 			if (object1 == null && object2 == null) {
337 			    return 0;
338 			} else if (object1 == null) {
339 			    return 1;
340 			} else if (object2 == null) {
341 			    return -1;
342 			}
343 			Long id1 = ((PersistedMessageBO) object1).getRouteQueueId();
344 			Long id2 = ((PersistedMessageBO) object2).getRouteQueueId();
345 
346 			try {
347 			    return this.comp.compare(id1, id2);
348 			} catch (Exception e) {
349 			    return 0;
350 			}
351 		    }
352 		});
353 	    }
354 	    routeQueueForm.setMessageQueueRows(queueEntries);
355 	}
356 	return null;
357     }
358 
359     protected List<PersistedMessageBO> findRouteQueues(HttpServletRequest request, MessageQueueForm routeQueueForm, int maxRows) {
360 	List<PersistedMessageBO> routeQueues = new ArrayList<PersistedMessageBO>();
361 
362 	// no filter applied
363 	if (StringUtils.isBlank(routeQueueForm.getFilterApplied())) {
364 	    routeQueues.addAll(getRouteQueueService().findAll(maxRows));
365 	}
366 
367 	// one or more filters applied
368 	else {
369 	    if (!StringUtils.isBlank(routeQueueForm.getRouteQueueIdFilter())) {
370 		if (!NumberUtils.isNumber(routeQueueForm.getRouteQueueIdFilter())) {
371 		    // TODO better error handling here
372 		    throw new RuntimeException("Message Id must be a number.");
373 		}
374 	    }
375 
376 	    Map<String, String> criteriaValues = new HashMap<String, String>();
377 	    String key = null;
378 	    String value = null;
379 	    String trimmedKey = null;
380 	    for (Iterator iter = request.getParameterMap().keySet().iterator(); iter.hasNext();) {
381 		key = (String) iter.next();
382 		if (key.endsWith(KSBConstants.ROUTE_QUEUE_FILTER_SUFFIX)) {
383 		    value = request.getParameter(key);
384 		    if (StringUtils.isNotBlank(value)) {
385 			trimmedKey = key.substring(0, key.indexOf(KSBConstants.ROUTE_QUEUE_FILTER_SUFFIX));
386 			criteriaValues.put(trimmedKey, value);
387 		    }
388 		}
389 	    }
390 	    routeQueues.addAll(getRouteQueueService().findByValues(criteriaValues, maxRows));
391 	}
392 	return routeQueues;
393     }
394 
395     private MessageQueueService getRouteQueueService() {
396 	return KSBServiceLocator.getMessageQueueService();
397     }
398 
399     /**
400          * Extracts the payload from a PersistedMessageBO, attempts to convert it to the expected AsynchronousCall type, and
401          * returns it.
402          *
403          * Throws an IllegalArgumentException if the decoded payload isnt of the expected type.
404          *
405          * @param message
406          *                The populated PersistedMessageBO object to extract the payload from.
407          * @return Returns the payload if one is present and it can be deserialized, otherwise returns null.
408          */
409     protected AsynchronousCall unwrapPayload(PersistedMessageBO message) {
410 	if (message == null || message.getPayload() == null) {
411 	    return null;
412 	}
413 	String encodedPayload = message.getPayload().getPayload();
414 	if (StringUtils.isBlank(encodedPayload)) {
415 	    return null;
416 	}
417 	Object decodedPayload = null;
418 	if (encodedPayload != null) {
419 	    decodedPayload = SerializationUtils.deserializeFromBase64(encodedPayload);
420 	}
421 	// fail fast if its not the expected type of AsynchronousCall
422 	if ((decodedPayload != null) && !(decodedPayload instanceof AsynchronousCall)) {
423 	    throw new IllegalArgumentException("PersistedMessageBO payload was not of the expected class. " + "Expected was ["
424 		    + AsynchronousCall.class.getName() + "], actual was: [" + decodedPayload.getClass().getName() + "]");
425 	}
426 	return (AsynchronousCall) decodedPayload;
427     }
428 
429 }