View Javadoc

1   /**
2    * Copyright 2005-2011 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.ksb.messaging.web;
17  
18  import org.apache.commons.collections.comparators.ComparableComparator;
19  import org.apache.commons.lang.StringUtils;
20  import org.apache.commons.lang.math.NumberUtils;
21  import org.apache.struts.action.ActionForm;
22  import org.apache.struts.action.ActionForward;
23  import org.apache.struts.action.ActionMapping;
24  import org.apache.struts.action.ActionMessage;
25  import org.apache.struts.action.ActionMessages;
26  import org.kuali.rice.core.api.config.CoreConfigHelper;
27  import org.kuali.rice.core.api.config.property.ConfigContext;
28  import org.kuali.rice.core.api.util.ConcreteKeyValue;
29  import org.kuali.rice.core.api.util.RiceConstants;
30  import org.kuali.rice.core.api.util.RiceUtilities;
31  import org.kuali.rice.core.api.util.io.SerializationUtils;
32  import org.kuali.rice.ksb.api.KsbApiServiceLocator;
33  import org.kuali.rice.ksb.api.messaging.AsynchronousCall;
34  import org.kuali.rice.ksb.api.registry.ServiceInfo;
35  import org.kuali.rice.ksb.messaging.MessageFetcher;
36  import org.kuali.rice.ksb.messaging.MessageServiceInvoker;
37  import org.kuali.rice.ksb.messaging.PersistedMessageBO;
38  import org.kuali.rice.ksb.messaging.service.MessageQueueService;
39  import org.kuali.rice.ksb.service.KSBServiceLocator;
40  import org.kuali.rice.ksb.util.KSBConstants;
41  
42  import javax.servlet.ServletException;
43  import javax.servlet.http.HttpServletRequest;
44  import javax.servlet.http.HttpServletResponse;
45  import java.io.IOException;
46  import java.sql.Timestamp;
47  import java.util.ArrayList;
48  import java.util.Calendar;
49  import java.util.Collections;
50  import java.util.Comparator;
51  import java.util.Date;
52  import java.util.HashMap;
53  import java.util.Iterator;
54  import java.util.List;
55  import java.util.Map;
56  
57  
58  /**
59   * Struts action for interacting with the queue of messages.
60   *
61   * @author Kuali Rice Team (rice.collab@kuali.org)
62   */
63  public class MessageQueueAction extends KSBAction {
64  
65      @Override
66  	public ActionForward start(ActionMapping mapping, ActionForm form, HttpServletRequest request,
67  	    HttpServletResponse response) throws IOException, ServletException {
68  	return mapping.findForward("report");
69      }
70  
71      public ActionForward save(ActionMapping mapping, ActionForm form, HttpServletRequest request,
72  	    HttpServletResponse response) throws Exception {
73  	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
74  	save(routeQueueForm);
75  
76  	routeQueueForm.getMessageQueueFromForm().getRouteQueueId();
77  	ActionMessages messages = new ActionMessages();
78  	messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.saved"));
79  	saveMessages(request, messages);
80  
81  //	routeQueueForm.setMessageId(null);
82  ////	routeQueueForm.setMessageQueueFromDatabase(null);
83  ////	routeQueueForm.setMessageQueueFromForm(null);
84  //	routeQueueForm.setShowEdit("yes");
85  //	routeQueueForm.setMethodToCall("");
86  //	establishRequiredState(request, form);
87  //	routeQueueForm.setMessageId(routeQueueId);
88  ////	routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
89  //	routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
90  //	routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(routeQueueForm.getMessageQueueFromForm()));
91  	return mapping.findForward("report");
92      }
93  
94      public ActionForward saveAndResubmit(ActionMapping mapping, ActionForm form, HttpServletRequest request,
95  	    HttpServletResponse response) throws Exception {
96  	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
97  	PersistedMessageBO message = save(routeQueueForm);
98  	KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
99  
100 	ActionMessages messages = new ActionMessages();
101 	messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.queued"));
102 	saveMessages(request, messages);
103 
104 	routeQueueForm.setMessageId(null);
105 	routeQueueForm.setMessageQueueFromDatabase(null);
106 	routeQueueForm.setMessageQueueFromForm(null);
107 	routeQueueForm.setShowEdit("yes");
108 	routeQueueForm.setMethodToCall("");
109 	establishRequiredState(request, form);
110 	routeQueueForm.setMessageId(message.getRouteQueueId());
111 	routeQueueForm.setMessageQueueFromForm(message);
112 	routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
113 	routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(message));
114 	return mapping.findForward("report");
115     }
116 
117 //    public ActionForward saveAndForward(ActionMapping mapping, ActionForm form, HttpServletRequest request,
118 //	    HttpServletResponse response) throws Exception {
119 //	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
120 //	PersistedMessageBO message = save(routeQueueForm);
121 //	ForwardedCallHandler adminService = getAdminServiceToForwardTo(message, routeQueueForm);
122 //	AsynchronousCall methodCall = message.getPayload().getMethodCall();
123 //	message.setMethodCall(methodCall);
124 //	adminService.handleCall(message);
125 //	KSBServiceLocator.getRouteQueueService().delete(message);
126 //
127 //	ActionMessages messages = new ActionMessages();
128 //	messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.queued"));
129 //	saveMessages(request, messages);
130 //
131 //	routeQueueForm.setMessageId(null);
132 //	routeQueueForm.setMessageQueueFromDatabase(null);
133 //	routeQueueForm.setMessageQueueFromForm(null);
134 //	routeQueueForm.setShowEdit("yes");
135 //	routeQueueForm.setMethodToCall("");
136 //	establishRequiredState(request, form);
137 //	routeQueueForm.setMessageId(message.getRouteQueueId());
138 //	routeQueueForm.setMessageQueueFromForm(message);
139 //	routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
140 //	routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(message));
141 //	return mapping.findForward("report");
142 //    }
143 
144     private PersistedMessageBO save(MessageQueueForm routeQueueForm) {
145 	Long routeQueueId = routeQueueForm.getMessageQueueFromForm().getRouteQueueId();
146 	if ((routeQueueId == null) || (routeQueueId.longValue() <= 0)) {
147 	    throw new IllegalArgumentException("Invalid routeQueueId passed in.  Cannot save");
148 	}
149 	// save the message
150 	PersistedMessageBO existingMessage = KSBServiceLocator.getMessageQueueService().findByRouteQueueId(routeQueueId);
151 	PersistedMessageBO message = routeQueueForm.getMessageQueueFromForm();
152 	// copy the new values over
153 	if (existingMessage == null) {
154 	    throw new RuntimeException("Could locate the existing message, it may have already been processed.");
155 	}
156 
157 	existingMessage.setQueuePriority(message.getQueuePriority());
158 	existingMessage.setIpNumber(message.getIpNumber());
159 	existingMessage.setLockVerNbr(message.getLockVerNbr());
160 	existingMessage.setApplicationId(message.getApplicationId());
161 	existingMessage.setMethodName(message.getMethodName());
162 	existingMessage.setQueueStatus(message.getQueueStatus());
163 	existingMessage.setRetryCount(message.getRetryCount());
164 	existingMessage.setServiceName(message.getServiceName());
165 	existingMessage.setValue1(message.getValue1());
166 	existingMessage.setValue2(message.getValue2());
167 	KSBServiceLocator.getMessageQueueService().save(existingMessage);
168 	return existingMessage;
169     }
170 
171 //    private ForwardedCallHandler getAdminServiceToForwardTo(PersistedMessageBO message, MessageQueueForm form) {
172 //	String ip = form.getIpAddress();
173 //	List<ServiceInfo> services = KSBServiceLocator.getServiceRegistry().fetchAll();
174 //	for (ServiceInfo service : services) {
175 //	    if (service.getQname().getLocalPart().equals(
176 //		    QName.valueOf(message.getServiceName()).getLocalPart() + "-forwardHandler")
177 //		    && service.getServerIp().equals(ip)) {
178 //		// retrieve a reference to the remote service
179 //		RemoteResourceServiceLocator remoteResourceLocator = KSBResourceLoaderFactory.getRemoteResourceLocator();
180 //		ForwardedCallHandler handler = (ForwardedCallHandler) remoteResourceLocator.getService(service.getQname(), service.getEndpointUrl());
181 //		if (handler != null) {
182 //		    return handler;
183 //		} else {
184 //		    LOG.warn("Failed to find forwarded call handler for service: " + service.getQname().toString() + " and endpoint URL: " + service.getEndpointUrl());
185 //		}
186 //	    }
187 //	}
188 //	throw new RuntimeException("Could not locate the BusAdminService for ip " + ip
189 //		+ " in order to forward the message.");
190 //    }
191 
192     /**
193          * Performs a quick ReQueue of the indicated persisted message.
194          *
195          * The net effect of this requeue is to set the Date to now, and to reset the RetryCount to zero. The payload is not
196          * modified.
197          *
198          * @param message
199          *                The populated message to be requeued.
200          */
201     protected void quickRequeueMessage(PersistedMessageBO message) {
202 	message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING);
203 	message.setQueueDate(new Timestamp(Calendar.getInstance().getTimeInMillis()));
204 	message.setRetryCount(new Integer(0));
205 	getRouteQueueService().save(message);
206     }
207 
208     public ActionForward quickRequeueMessage(ActionMapping mapping, ActionForm form, HttpServletRequest request,
209 	    HttpServletResponse response) throws Exception {
210 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
211 	if (routeQueueForm.getMessageQueueFromDatabase() == null) {
212 	    throw new IllegalArgumentException("No messageId passed in with the Request.");
213 	}
214 
215 	PersistedMessageBO message = routeQueueForm.getMessageQueueFromDatabase();
216 	quickRequeueMessage(message);
217 	KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
218 
219 	ActionMessages messages = new ActionMessages();
220 	messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.requeued"));
221 	saveMessages(request, messages);
222 
223 	routeQueueForm.setMessageQueueFromDatabase(null);
224 	routeQueueForm.setMessageQueueFromForm(null);
225 	routeQueueForm.setMessageId(null);
226 	routeQueueForm.setMethodToCall("");
227 
228 	// re-run the state method to load the full set of rows
229 	establishRequiredState(request, form);
230 	return mapping.findForward("report");
231     }
232 
233     public ActionForward edit(ActionMapping mapping, ActionForm form, HttpServletRequest request,
234 	    HttpServletResponse response) throws IOException, ServletException {
235 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
236 	routeQueueForm.setShowEdit("yes");
237 	routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
238 	routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
239 	routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(routeQueueForm.getMessageQueueFromForm()));
240 	return mapping.findForward("basic");
241     }
242 
243     public ActionForward view(ActionMapping mapping, ActionForm form, HttpServletRequest request,
244 	    HttpServletResponse response) throws Exception {
245 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
246 	routeQueueForm.setShowEdit("no");
247 	routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
248 	routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
249 	AsynchronousCall messagePayload = unwrapPayload(routeQueueForm.getMessageQueueFromDatabase());
250 	routeQueueForm.getMessageQueueFromForm().setMethodCall(messagePayload);
251 	return mapping.findForward("payload");
252     }
253 
254     public ActionForward reset(ActionMapping mapping, ActionForm form, HttpServletRequest request,
255 	    HttpServletResponse response) throws Exception {
256 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
257 	if (routeQueueForm.getShowEdit().equals("yes")) {
258 	    routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
259 	}
260 	return mapping.findForward("basic");
261     }
262 
263     public ActionForward clear(ActionMapping mapping, ActionForm form, HttpServletRequest request,
264 	    HttpServletResponse response) throws Exception {
265 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
266 	routeQueueForm.getMessageQueueFromForm().setQueuePriority(null);
267 	routeQueueForm.getMessageQueueFromForm().setQueueStatus(null);
268 	routeQueueForm.getMessageQueueFromForm().setQueueDate(null);
269 	routeQueueForm.getMessageQueueFromForm().setExpirationDate(null);
270 	routeQueueForm.getMessageQueueFromForm().setRetryCount(null);
271 	routeQueueForm.getMessageQueueFromForm().setIpNumber(null);
272 	routeQueueForm.getMessageQueueFromForm().setServiceName(null);
273 	routeQueueForm.getMessageQueueFromForm().setApplicationId(null);
274 	routeQueueForm.getMessageQueueFromForm().setMethodName(null);
275 	routeQueueForm.getMessageQueueFromForm().setPayload(null);
276 	routeQueueForm.getMessageQueueFromForm().setMethodCall(null);
277 	routeQueueForm.setExistingQueueDate(null);
278 	routeQueueForm.setNewQueueDate(null);
279 	return mapping.findForward("basic");
280     }
281 
282     public ActionForward delete(ActionMapping mapping, ActionForm form, HttpServletRequest request,
283 	    HttpServletResponse response) throws Exception {
284 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
285 	routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
286 	routeQueueForm.setMessageQueueFromDatabase(null);
287 	getRouteQueueService().delete(routeQueueForm.getMessageQueueFromForm());
288 	ActionMessages messages = new ActionMessages();
289 	messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.deleted", routeQueueForm
290 		.getMessageQueueFromForm().getRouteQueueId().toString()));
291 	saveMessages(request, messages);
292 	routeQueueForm.setMessageId(null);
293 	establishRequiredState(request, form);
294 	return mapping.findForward("report");
295     }
296 
297     public ActionForward executeMessageFetcher(ActionMapping mapping,  ActionForm form, HttpServletRequest request,
298 	    HttpServletResponse response) throws Exception {
299 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
300 	ActionMessages messages = new ActionMessages();
301 	if (routeQueueForm.getMaxMessageFetcherMessages() == null || routeQueueForm.getMaxMessageFetcherMessages() <= 0) {
302 	    messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.invalidMessages", routeQueueForm.getMaxMessageFetcherMessages()));
303 	}
304 	if (!messages.isEmpty()) {
305 	    saveMessages(request, messages);
306 	    return mapping.findForward("report");
307 	}
308 	new MessageFetcher(routeQueueForm.getMaxMessageFetcherMessages()).run();
309 	return mapping.findForward("report");
310     }
311 
312     /**
313          * Sets up the expected state by retrieving the selected RouteQueue by RouteQueueId, and placing it in the
314          * ExistingRouteQueue member.
315          *
316          * Called by the super's Execute method on every request.
317          */
318     @Override
319 	public ActionMessages establishRequiredState(HttpServletRequest request, ActionForm form) throws Exception {
320 	request.setAttribute("rice_constant", getServlet().getServletContext().getAttribute("RiceConstants"));
321 	request.setAttribute("ksb_constant", getServlet().getServletContext().getAttribute("KSBConstants"));
322 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
323 	routeQueueForm.setMyIpAddress(RiceUtilities.getIpNumber());
324 	routeQueueForm.setMyApplicationId(CoreConfigHelper.getApplicationId());
325 	routeQueueForm.setMessagePersistence(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_PERSISTENCE));
326 	routeQueueForm.setMessageDelivery(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY));
327 	routeQueueForm.setMessageOff(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGING_OFF));
328 	List<ServiceInfo> services = KsbApiServiceLocator.getServiceRegistry().getAllOnlineServices();
329 	if (routeQueueForm.getMessageId() != null) {
330 	    PersistedMessageBO rq = getRouteQueueService().findByRouteQueueId(routeQueueForm.getMessageId());
331 	    if (rq != null) {
332 		routeQueueForm.setExistingQueueDate(RiceConstants.getDefaultDateFormat().format(new Date()));
333 		routeQueueForm.setMessageQueueFromDatabase(rq);
334 		// establish IP addresses where this message could safely be forwarded to
335 		String serviceName = rq.getServiceName();
336 		for (ServiceInfo serviceInfo : services) {
337 		    if (serviceInfo.getServiceName().equals(serviceName)) {
338 			routeQueueForm.getIpAddresses().add(
339 				new ConcreteKeyValue(serviceInfo.getServerIpAddress(), serviceInfo.getServerIpAddress()));
340 		    }
341 		}
342 	    } else {
343 		ActionMessages messages = new ActionMessages();
344 		messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage(
345 			"messagequeue.RouteQueueService.queuedDocumentNotFound", routeQueueForm.getMessageId().toString()));
346 		return messages;
347 	    }
348 	    routeQueueForm.setMessageId(null);
349 	} else if (!"clear".equalsIgnoreCase(request.getParameter("methodToCall"))) {
350 	    List<PersistedMessageBO> queueEntries = findRouteQueues(request, routeQueueForm, routeQueueForm.getMaxRows() + 1);
351 	    if (queueEntries.size() > 0) {
352 		Collections.sort(queueEntries, new Comparator() {
353 		    private Comparator comp = new ComparableComparator();
354 
355 		    @Override
356 			public int compare(Object object1, Object object2) {
357 			if (object1 == null && object2 == null) {
358 			    return 0;
359 			} else if (object1 == null) {
360 			    return 1;
361 			} else if (object2 == null) {
362 			    return -1;
363 			}
364 			Long id1 = ((PersistedMessageBO) object1).getRouteQueueId();
365 			Long id2 = ((PersistedMessageBO) object2).getRouteQueueId();
366 
367 			try {
368 			    return this.comp.compare(id1, id2);
369 			} catch (Exception e) {
370 			    return 0;
371 			}
372 		    }
373 		});
374 	    }
375 	    routeQueueForm.setMessageQueueRows(queueEntries);
376 	}
377 	return null;
378     }
379 
380     protected List<PersistedMessageBO> findRouteQueues(HttpServletRequest request, MessageQueueForm routeQueueForm, int maxRows) {
381 	List<PersistedMessageBO> routeQueues = new ArrayList<PersistedMessageBO>();
382 
383 	// no filter applied
384 	if (StringUtils.isBlank(routeQueueForm.getFilterApplied())) {
385 	    routeQueues.addAll(getRouteQueueService().findAll(maxRows));
386 	}
387 
388 	// one or more filters applied
389 	else {
390 	    if (!StringUtils.isBlank(routeQueueForm.getRouteQueueIdFilter())) {
391 		if (!NumberUtils.isNumber(routeQueueForm.getRouteQueueIdFilter())) {
392 		    // TODO better error handling here
393 		    throw new RuntimeException("Message Id must be a number.");
394 		}
395 	    }
396 
397 	    Map<String, String> criteriaValues = new HashMap<String, String>();
398 	    String key = null;
399 	    String value = null;
400 	    String trimmedKey = null;
401 	    for (Iterator iter = request.getParameterMap().keySet().iterator(); iter.hasNext();) {
402 		key = (String) iter.next();
403 		if (key.endsWith(KSBConstants.ROUTE_QUEUE_FILTER_SUFFIX)) {
404 		    value = request.getParameter(key);
405 		    if (StringUtils.isNotBlank(value)) {
406 			trimmedKey = key.substring(0, key.indexOf(KSBConstants.ROUTE_QUEUE_FILTER_SUFFIX));
407 			criteriaValues.put(trimmedKey, value);
408 		    }
409 		}
410 	    }
411 	    routeQueues.addAll(getRouteQueueService().findByValues(criteriaValues, maxRows));
412 	}
413 	return routeQueues;
414     }
415 
416     private MessageQueueService getRouteQueueService() {
417 	return KSBServiceLocator.getMessageQueueService();
418     }
419 
420     /**
421          * Extracts the payload from a PersistedMessageBO, attempts to convert it to the expected AsynchronousCall type, and
422          * returns it.
423          *
424          * Throws an IllegalArgumentException if the decoded payload isnt of the expected type.
425          *
426          * @param message
427          *                The populated PersistedMessageBO object to extract the payload from.
428          * @return Returns the payload if one is present and it can be deserialized, otherwise returns null.
429          */
430     protected AsynchronousCall unwrapPayload(PersistedMessageBO message) {
431 	if (message == null || message.getPayload() == null) {
432 	    return null;
433 	}
434 	String encodedPayload = message.getPayload().getPayload();
435 	if (StringUtils.isBlank(encodedPayload)) {
436 	    return null;
437 	}
438 	Object decodedPayload = null;
439 	if (encodedPayload != null) {
440 	    decodedPayload = SerializationUtils.deserializeFromBase64(encodedPayload);
441 	}
442 	// fail fast if its not the expected type of AsynchronousCall
443 	if ((decodedPayload != null) && !(decodedPayload instanceof AsynchronousCall)) {
444 	    throw new IllegalArgumentException("PersistedMessageBO payload was not of the expected class. " + "Expected was ["
445 		    + AsynchronousCall.class.getName() + "], actual was: [" + decodedPayload.getClass().getName() + "]");
446 	}
447 	return (AsynchronousCall) decodedPayload;
448     }
449 
450 }