View Javadoc

1   /*
2    * Copyright 2005-2007 The Kuali Foundation
3    *
4    *
5    * Licensed under the Educational Community License, Version 2.0 (the "License"); you may not use this file except in
6    * compliance with the License. You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS
11   * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific
12   * language governing permissions and limitations under the License.
13   */
14  package org.kuali.rice.ksb.messaging.web;
15  
16  import java.io.IOException;
17  import java.sql.Timestamp;
18  import java.util.ArrayList;
19  import java.util.Calendar;
20  import java.util.Collections;
21  import java.util.Comparator;
22  import java.util.Date;
23  import java.util.HashMap;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.Map;
27  
28  import javax.servlet.ServletException;
29  import javax.servlet.http.HttpServletRequest;
30  import javax.servlet.http.HttpServletResponse;
31  import javax.xml.namespace.QName;
32  
33  import org.apache.commons.collections.comparators.ComparableComparator;
34  import org.apache.commons.lang.StringUtils;
35  import org.apache.commons.lang.math.NumberUtils;
36  import org.apache.struts.action.ActionForm;
37  import org.apache.struts.action.ActionForward;
38  import org.apache.struts.action.ActionMapping;
39  import org.apache.struts.action.ActionMessage;
40  import org.apache.struts.action.ActionMessages;
41  import org.kuali.rice.core.config.ConfigContext;
42  import org.kuali.rice.core.util.JSTLConstants;
43  import org.kuali.rice.core.util.RiceConstants;
44  import org.kuali.rice.core.util.RiceUtilities;
45  import org.kuali.rice.ksb.messaging.AsynchronousCall;
46  import org.kuali.rice.ksb.messaging.MessageFetcher;
47  import org.kuali.rice.ksb.messaging.MessageServiceInvoker;
48  import org.kuali.rice.ksb.messaging.PersistedMessage;
49  import org.kuali.rice.ksb.messaging.RemoteResourceServiceLocator;
50  import org.kuali.rice.ksb.messaging.ServiceInfo;
51  import org.kuali.rice.ksb.messaging.callforwarding.ForwardedCallHandler;
52  import org.kuali.rice.ksb.messaging.resourceloader.KSBResourceLoaderFactory;
53  import org.kuali.rice.ksb.messaging.service.MessageQueueService;
54  import org.kuali.rice.ksb.service.KSBServiceLocator;
55  import org.kuali.rice.ksb.util.KSBConstants;
56  
57  
58  /**
59   * Struts action for interacting with the queue of messages.
60   *
61   * @author Kuali Rice Team (rice.collab@kuali.org)
62   */
63  public class MessageQueueAction extends KSBAction {
64  
65      private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger.getLogger(MessageQueueAction.class);
66  
67      public ActionForward start(ActionMapping mapping, ActionForm form, HttpServletRequest request,
68  	    HttpServletResponse response) throws IOException, ServletException {
69  	return mapping.findForward("report");
70      }
71  
72      public ActionForward save(ActionMapping mapping, ActionForm form, HttpServletRequest request,
73  	    HttpServletResponse response) throws Exception {
74  	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
75  	save(routeQueueForm);
76  
77  	Long routeQueueId = routeQueueForm.getMessageQueueFromForm().getRouteQueueId();
78  	ActionMessages messages = new ActionMessages();
79  	messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.saved"));
80  	saveMessages(request, messages);
81  
82  //	routeQueueForm.setMessageId(null);
83  ////	routeQueueForm.setMessageQueueFromDatabase(null);
84  ////	routeQueueForm.setMessageQueueFromForm(null);
85  //	routeQueueForm.setShowEdit("yes");
86  //	routeQueueForm.setMethodToCall("");
87  //	establishRequiredState(request, form);
88  //	routeQueueForm.setMessageId(routeQueueId);
89  ////	routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
90  //	routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
91  //	routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(routeQueueForm.getMessageQueueFromForm()));
92  	return mapping.findForward("report");
93      }
94  
95      public ActionForward saveAndResubmit(ActionMapping mapping, ActionForm form, HttpServletRequest request,
96  	    HttpServletResponse response) throws Exception {
97  	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
98  	PersistedMessage message = save(routeQueueForm);
99  	KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
100 
101 	ActionMessages messages = new ActionMessages();
102 	messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.queued"));
103 	saveMessages(request, messages);
104 
105 	routeQueueForm.setMessageId(null);
106 	routeQueueForm.setMessageQueueFromDatabase(null);
107 	routeQueueForm.setMessageQueueFromForm(null);
108 	routeQueueForm.setShowEdit("yes");
109 	routeQueueForm.setMethodToCall("");
110 	establishRequiredState(request, form);
111 	routeQueueForm.setMessageId(message.getRouteQueueId());
112 	routeQueueForm.setMessageQueueFromForm(message);
113 	routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
114 	routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(message));
115 	return mapping.findForward("report");
116     }
117 
118     public ActionForward saveAndForward(ActionMapping mapping, ActionForm form, HttpServletRequest request,
119 	    HttpServletResponse response) throws Exception {
120 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
121 	PersistedMessage message = save(routeQueueForm);
122 	ForwardedCallHandler adminService = getAdminServiceToForwardTo(message, routeQueueForm);
123 	AsynchronousCall methodCall = message.getPayload().getMethodCall();
124 	message.setMethodCall(methodCall);
125 	adminService.handleCall(message);
126 	KSBServiceLocator.getRouteQueueService().delete(message);
127 
128 	ActionMessages messages = new ActionMessages();
129 	messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.queued"));
130 	saveMessages(request, messages);
131 
132 	routeQueueForm.setMessageId(null);
133 	routeQueueForm.setMessageQueueFromDatabase(null);
134 	routeQueueForm.setMessageQueueFromForm(null);
135 	routeQueueForm.setShowEdit("yes");
136 	routeQueueForm.setMethodToCall("");
137 	establishRequiredState(request, form);
138 	routeQueueForm.setMessageId(message.getRouteQueueId());
139 	routeQueueForm.setMessageQueueFromForm(message);
140 	routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
141 	routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(message));
142 	return mapping.findForward("report");
143     }
144 
145     private PersistedMessage save(MessageQueueForm routeQueueForm) {
146 	Long routeQueueId = routeQueueForm.getMessageQueueFromForm().getRouteQueueId();
147 	if ((routeQueueId == null) || (routeQueueId.longValue() <= 0)) {
148 	    throw new IllegalArgumentException("Invalid routeQueueId passed in.  Cannot save");
149 	}
150 	// save the message
151 	PersistedMessage existingMessage = KSBServiceLocator.getRouteQueueService().findByRouteQueueId(routeQueueId);
152 	PersistedMessage message = routeQueueForm.getMessageQueueFromForm();
153 	// copy the new values over
154 	if (existingMessage == null) {
155 	    throw new RuntimeException("Could locate the existing message, it may have already been processed.");
156 	}
157 
158 	existingMessage.setQueuePriority(message.getQueuePriority());
159 	existingMessage.setIpNumber(message.getIpNumber());
160 	existingMessage.setLockVerNbr(message.getLockVerNbr());
161 	existingMessage.setServiceNamespace(message.getServiceNamespace());
162 	existingMessage.setMethodName(message.getMethodName());
163 	existingMessage.setQueueStatus(message.getQueueStatus());
164 	existingMessage.setRetryCount(message.getRetryCount());
165 	existingMessage.setServiceName(message.getServiceName());
166 	existingMessage.setValue1(message.getValue1());
167 	existingMessage.setValue2(message.getValue2());
168 	KSBServiceLocator.getRouteQueueService().save(existingMessage);
169 	return existingMessage;
170     }
171 
172     private ForwardedCallHandler getAdminServiceToForwardTo(PersistedMessage message, MessageQueueForm form) {
173 	String ip = form.getIpAddress();
174 	List<ServiceInfo> services = KSBServiceLocator.getServiceRegistry().fetchAll();
175 	for (ServiceInfo service : services) {
176 	    if (service.getQname().getLocalPart().equals(
177 		    QName.valueOf(message.getServiceName()).getLocalPart() + "-forwardHandler")
178 		    && service.getServerIp().equals(ip)) {
179 		// retrieve a reference to the remote service
180 		RemoteResourceServiceLocator remoteResourceLocator = KSBResourceLoaderFactory.getRemoteResourceLocator();
181 		ForwardedCallHandler handler = (ForwardedCallHandler) remoteResourceLocator.getService(service.getQname(), service.getEndpointUrl());
182 		if (handler != null) {
183 		    return handler;
184 		} else {
185 		    LOG.warn("Failed to find forwarded call handler for service: " + service.getQname().toString() + " and endpoint URL: " + service.getEndpointUrl());
186 		}
187 	    }
188 	}
189 	throw new RuntimeException("Could not locate the BusAdminService for ip " + ip
190 		+ " in order to forward the message.");
191     }
192 
193     /**
194          * Performs a quick ReQueue of the indicated persisted message.
195          *
196          * The net effect of this requeue is to set the Date to now, and to reset the RetryCount to zero. The payload is not
197          * modified.
198          *
199          * @param message
200          *                The populated message to be requeued.
201          */
202     protected void quickRequeueMessage(PersistedMessage message) {
203 	message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING);
204 	message.setQueueDate(new Timestamp(Calendar.getInstance().getTimeInMillis()));
205 	message.setRetryCount(new Integer(0));
206 	getRouteQueueService().save(message);
207     }
208 
209     public ActionForward quickRequeueMessage(ActionMapping mapping, ActionForm form, HttpServletRequest request,
210 	    HttpServletResponse response) throws Exception {
211 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
212 	if (routeQueueForm.getMessageQueueFromDatabase() == null) {
213 	    throw new IllegalArgumentException("No messageId passed in with the Request.");
214 	}
215 
216 	PersistedMessage message = routeQueueForm.getMessageQueueFromDatabase();
217 	quickRequeueMessage(message);
218 	KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
219 
220 	ActionMessages messages = new ActionMessages();
221 	messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.requeued"));
222 	saveMessages(request, messages);
223 
224 	routeQueueForm.setMessageQueueFromDatabase(null);
225 	routeQueueForm.setMessageQueueFromForm(null);
226 	routeQueueForm.setMessageId(null);
227 	routeQueueForm.setMethodToCall("");
228 
229 	// re-run the state method to load the full set of rows
230 	establishRequiredState(request, form);
231 	return mapping.findForward("report");
232     }
233 
234     public ActionForward edit(ActionMapping mapping, ActionForm form, HttpServletRequest request,
235 	    HttpServletResponse response) throws IOException, ServletException {
236 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
237 	routeQueueForm.setShowEdit("yes");
238 	routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
239 	routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
240 	routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(routeQueueForm.getMessageQueueFromForm()));
241 	return mapping.findForward("basic");
242     }
243 
244     public ActionForward view(ActionMapping mapping, ActionForm form, HttpServletRequest request,
245 	    HttpServletResponse response) throws Exception {
246 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
247 	routeQueueForm.setShowEdit("no");
248 	routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
249 	routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
250 	AsynchronousCall messagePayload = unwrapPayload(routeQueueForm.getMessageQueueFromDatabase());
251 	routeQueueForm.getMessageQueueFromForm().setMethodCall(messagePayload);
252 	return mapping.findForward("payload");
253     }
254 
255     public ActionForward reset(ActionMapping mapping, ActionForm form, HttpServletRequest request,
256 	    HttpServletResponse response) throws Exception {
257 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
258 	if (routeQueueForm.getShowEdit().equals("yes")) {
259 	    routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
260 	}
261 	return mapping.findForward("basic");
262     }
263 
264     public ActionForward clear(ActionMapping mapping, ActionForm form, HttpServletRequest request,
265 	    HttpServletResponse response) throws Exception {
266 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
267 	routeQueueForm.getMessageQueueFromForm().setQueuePriority(null);
268 	routeQueueForm.getMessageQueueFromForm().setQueueStatus(null);
269 	routeQueueForm.getMessageQueueFromForm().setQueueDate(null);
270 	routeQueueForm.getMessageQueueFromForm().setExpirationDate(null);
271 	routeQueueForm.getMessageQueueFromForm().setRetryCount(null);
272 	routeQueueForm.getMessageQueueFromForm().setIpNumber(null);
273 	routeQueueForm.getMessageQueueFromForm().setServiceName(null);
274 	routeQueueForm.getMessageQueueFromForm().setServiceNamespace(null);
275 	routeQueueForm.getMessageQueueFromForm().setMethodName(null);
276 	routeQueueForm.getMessageQueueFromForm().setPayload(null);
277 	routeQueueForm.getMessageQueueFromForm().setMethodCall(null);
278 	routeQueueForm.setExistingQueueDate(null);
279 	routeQueueForm.setNewQueueDate(null);
280 	return mapping.findForward("basic");
281     }
282 
283     public ActionForward delete(ActionMapping mapping, ActionForm form, HttpServletRequest request,
284 	    HttpServletResponse response) throws Exception {
285 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
286 	routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
287 	routeQueueForm.setMessageQueueFromDatabase(null);
288 	getRouteQueueService().delete(routeQueueForm.getMessageQueueFromForm());
289 	ActionMessages messages = new ActionMessages();
290 	messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.deleted", routeQueueForm
291 		.getMessageQueueFromForm().getRouteQueueId().toString()));
292 	saveMessages(request, messages);
293 	routeQueueForm.setMessageId(null);
294 	establishRequiredState(request, form);
295 	return mapping.findForward("report");
296     }
297 
298     public ActionForward executeMessageFetcher(ActionMapping mapping,  ActionForm form, HttpServletRequest request,
299 	    HttpServletResponse response) throws Exception {
300 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
301 	ActionMessages messages = new ActionMessages();
302 	if (routeQueueForm.getMaxMessageFetcherMessages() == null || routeQueueForm.getMaxMessageFetcherMessages() <= 0) {
303 	    messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.invalidMessages", routeQueueForm.getMaxMessageFetcherMessages()));
304 	}
305 	if (!messages.isEmpty()) {
306 	    saveMessages(request, messages);
307 	    return mapping.findForward("report");
308 	}
309 	new MessageFetcher(routeQueueForm.getMaxMessageFetcherMessages()).run();
310 	return mapping.findForward("report");
311     }
312 
313     /**
314          * Sets up the expected state by retrieving the selected RouteQueue by RouteQueueId, and placing it in the
315          * ExistingRouteQueue member.
316          *
317          * Called by the super's Execute method on every request.
318          */
319     public ActionMessages establishRequiredState(HttpServletRequest request, ActionForm form) throws Exception {
320 	request.setAttribute("rice_constant", new JSTLConstants(RiceConstants.class));
321 	request.setAttribute("ksb_constant", new KSBConstants());
322 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
323 	routeQueueForm.setMyIpAddress(RiceUtilities.getIpNumber());
324 	routeQueueForm.setMyServiceNamespace(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.SERVICE_NAMESPACE));
325 	routeQueueForm.setMessagePersistence(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.MESSAGE_PERSISTENCE));
326 	routeQueueForm.setMessageDelivery(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.MESSAGE_DELIVERY));
327 	routeQueueForm.setMessageOff(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.MESSAGING_OFF));
328 	List<ServiceInfo> services = KSBServiceLocator.getServiceRegistry().fetchAll();
329 	if (routeQueueForm.getMessageId() != null) {
330 	    PersistedMessage rq = getRouteQueueService().findByRouteQueueId(routeQueueForm.getMessageId());
331 	    if (rq != null) {
332 		routeQueueForm.setExistingQueueDate(RiceConstants.getDefaultDateFormat().format(new Date()));
333 		routeQueueForm.setMessageQueueFromDatabase(rq);
334 		// establish IP addresses where this message could safely be forwarded to
335 		String serviceName = rq.getServiceName();
336 		for (ServiceInfo serviceInfo : services) {
337 		    if (serviceInfo.getServiceName().equals(serviceName)) {
338 			routeQueueForm.getIpAddresses().add(
339 				new ValueLabelPair(serviceInfo.getServerIp(), serviceInfo.getServerIp()));
340 		    }
341 		}
342 	    } else {
343 		ActionMessages messages = new ActionMessages();
344 		messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage(
345 			"messagequeue.RouteQueueService.queuedDocumentNotFound", routeQueueForm.getMessageId().toString()));
346 		return messages;
347 	    }
348 	    routeQueueForm.setMessageId(null);
349 	} else if (!"clear".equalsIgnoreCase(request.getParameter("methodToCall"))) {
350 	    List<PersistedMessage> queueEntries = findRouteQueues(request, routeQueueForm, routeQueueForm.getMaxRows() + 1);
351 	    if (queueEntries.size() > 0) {
352 		Collections.sort(queueEntries, new Comparator() {
353 		    private Comparator comp = new ComparableComparator();
354 
355 		    public int compare(Object object1, Object object2) {
356 			if (object1 == null && object2 == null) {
357 			    return 0;
358 			} else if (object1 == null) {
359 			    return 1;
360 			} else if (object2 == null) {
361 			    return -1;
362 			}
363 			Long id1 = ((PersistedMessage) object1).getRouteQueueId();
364 			Long id2 = ((PersistedMessage) object2).getRouteQueueId();
365 
366 			try {
367 			    return this.comp.compare(id1, id2);
368 			} catch (Exception e) {
369 			    return 0;
370 			}
371 		    }
372 		});
373 	    }
374 	    routeQueueForm.setMessageQueueRows(queueEntries);
375 	}
376 	return null;
377     }
378 
379     protected List<PersistedMessage> findRouteQueues(HttpServletRequest request, MessageQueueForm routeQueueForm, int maxRows) {
380 	List<PersistedMessage> routeQueues = new ArrayList<PersistedMessage>();
381 
382 	// no filter applied
383 	if (StringUtils.isBlank(routeQueueForm.getFilterApplied())) {
384 	    routeQueues.addAll(getRouteQueueService().findAll(maxRows));
385 	}
386 
387 	// one or more filters applied
388 	else {
389 	    if (!StringUtils.isBlank(routeQueueForm.getRouteQueueIdFilter())) {
390 		if (!NumberUtils.isNumber(routeQueueForm.getRouteQueueIdFilter())) {
391 		    // TODO better error handling here
392 		    throw new RuntimeException("Message Id must be a number.");
393 		}
394 	    }
395 
396 	    Map<String, String> criteriaValues = new HashMap<String, String>();
397 	    String key = null;
398 	    String value = null;
399 	    String trimmedKey = null;
400 	    for (Iterator iter = request.getParameterMap().keySet().iterator(); iter.hasNext();) {
401 		key = (String) iter.next();
402 		if (key.endsWith(KSBConstants.ROUTE_QUEUE_FILTER_SUFFIX)) {
403 		    value = request.getParameter(key);
404 		    if (StringUtils.isNotBlank(value)) {
405 			trimmedKey = key.substring(0, key.indexOf(KSBConstants.ROUTE_QUEUE_FILTER_SUFFIX));
406 			criteriaValues.put(trimmedKey, value);
407 		    }
408 		}
409 	    }
410 	    routeQueues.addAll(getRouteQueueService().findByValues(criteriaValues, maxRows));
411 	}
412 	return routeQueues;
413     }
414 
415     private MessageQueueService getRouteQueueService() {
416 	return KSBServiceLocator.getRouteQueueService();
417     }
418 
419     /**
420          * Extracts the payload from a PersistedMessage, attempts to convert it to the expected AsynchronousCall type, and
421          * returns it.
422          *
423          * Throws an IllegalArgumentException if the decoded payload isnt of the expected type.
424          *
425          * @param message
426          *                The populated PersistedMessage object to extract the payload from.
427          * @return Returns the payload if one is present and it can be deserialized, otherwise returns null.
428          */
429     protected AsynchronousCall unwrapPayload(PersistedMessage message) {
430 	if (message == null || message.getPayload() == null) {
431 	    return null;
432 	}
433 	String encodedPayload = message.getPayload().getPayload();
434 	if (StringUtils.isBlank(encodedPayload)) {
435 	    return null;
436 	}
437 	Object decodedPayload = null;
438 	if (encodedPayload != null) {
439 	    decodedPayload = KSBServiceLocator.getMessageHelper().deserializeObject(encodedPayload);
440 	}
441 	// fail fast if its not the expected type of AsynchronousCall
442 	if ((decodedPayload != null) && !(decodedPayload instanceof AsynchronousCall)) {
443 	    throw new IllegalArgumentException("PersistedMessage payload was not of the expected class. " + "Expected was ["
444 		    + AsynchronousCall.class.getName() + "], actual was: [" + decodedPayload.getClass().getName() + "]");
445 	}
446 	return (AsynchronousCall) decodedPayload;
447     }
448 
449 }