View Javadoc

1   /*
2    * Copyright 2006-2011 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.kuali.rice.ksb.messaging.web;
18  
19  import org.apache.commons.collections.comparators.ComparableComparator;
20  import org.apache.commons.lang.StringUtils;
21  import org.apache.commons.lang.math.NumberUtils;
22  import org.apache.struts.action.ActionForm;
23  import org.apache.struts.action.ActionForward;
24  import org.apache.struts.action.ActionMapping;
25  import org.apache.struts.action.ActionMessage;
26  import org.apache.struts.action.ActionMessages;
27  import org.kuali.rice.core.api.config.CoreConfigHelper;
28  import org.kuali.rice.core.api.config.property.ConfigContext;
29  import org.kuali.rice.core.api.util.ConcreteKeyValue;
30  import org.kuali.rice.core.api.util.RiceConstants;
31  import org.kuali.rice.core.api.util.RiceUtilities;
32  import org.kuali.rice.core.api.util.io.SerializationUtils;
33  import org.kuali.rice.ksb.api.KsbApiServiceLocator;
34  import org.kuali.rice.ksb.api.messaging.AsynchronousCall;
35  import org.kuali.rice.ksb.api.registry.ServiceInfo;
36  import org.kuali.rice.ksb.messaging.MessageFetcher;
37  import org.kuali.rice.ksb.messaging.MessageServiceInvoker;
38  import org.kuali.rice.ksb.messaging.PersistedMessageBO;
39  import org.kuali.rice.ksb.messaging.service.MessageQueueService;
40  import org.kuali.rice.ksb.service.KSBServiceLocator;
41  import org.kuali.rice.ksb.util.KSBConstants;
42  
43  import javax.servlet.ServletException;
44  import javax.servlet.http.HttpServletRequest;
45  import javax.servlet.http.HttpServletResponse;
46  import java.io.IOException;
47  import java.sql.Timestamp;
48  import java.util.ArrayList;
49  import java.util.Calendar;
50  import java.util.Collections;
51  import java.util.Comparator;
52  import java.util.Date;
53  import java.util.HashMap;
54  import java.util.Iterator;
55  import java.util.List;
56  import java.util.Map;
57  
58  
59  /**
60   * Struts action for interacting with the queue of messages.
61   *
62   * @author Kuali Rice Team (rice.collab@kuali.org)
63   */
64  public class MessageQueueAction extends KSBAction {
65  
66      @Override
67  	public ActionForward start(ActionMapping mapping, ActionForm form, HttpServletRequest request,
68  	    HttpServletResponse response) throws IOException, ServletException {
69  	return mapping.findForward("report");
70      }
71  
72      public ActionForward save(ActionMapping mapping, ActionForm form, HttpServletRequest request,
73  	    HttpServletResponse response) throws Exception {
74  	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
75  	save(routeQueueForm);
76  
77  	routeQueueForm.getMessageQueueFromForm().getRouteQueueId();
78  	ActionMessages messages = new ActionMessages();
79  	messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.saved"));
80  	saveMessages(request, messages);
81  
82  //	routeQueueForm.setMessageId(null);
83  ////	routeQueueForm.setMessageQueueFromDatabase(null);
84  ////	routeQueueForm.setMessageQueueFromForm(null);
85  //	routeQueueForm.setShowEdit("yes");
86  //	routeQueueForm.setMethodToCall("");
87  //	establishRequiredState(request, form);
88  //	routeQueueForm.setMessageId(routeQueueId);
89  ////	routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
90  //	routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
91  //	routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(routeQueueForm.getMessageQueueFromForm()));
92  	return mapping.findForward("report");
93      }
94  
95      public ActionForward saveAndResubmit(ActionMapping mapping, ActionForm form, HttpServletRequest request,
96  	    HttpServletResponse response) throws Exception {
97  	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
98  	PersistedMessageBO message = save(routeQueueForm);
99  	KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
100 
101 	ActionMessages messages = new ActionMessages();
102 	messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.queued"));
103 	saveMessages(request, messages);
104 
105 	routeQueueForm.setMessageId(null);
106 	routeQueueForm.setMessageQueueFromDatabase(null);
107 	routeQueueForm.setMessageQueueFromForm(null);
108 	routeQueueForm.setShowEdit("yes");
109 	routeQueueForm.setMethodToCall("");
110 	establishRequiredState(request, form);
111 	routeQueueForm.setMessageId(message.getRouteQueueId());
112 	routeQueueForm.setMessageQueueFromForm(message);
113 	routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
114 	routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(message));
115 	return mapping.findForward("report");
116     }
117 
118 //    public ActionForward saveAndForward(ActionMapping mapping, ActionForm form, HttpServletRequest request,
119 //	    HttpServletResponse response) throws Exception {
120 //	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
121 //	PersistedMessageBO message = save(routeQueueForm);
122 //	ForwardedCallHandler adminService = getAdminServiceToForwardTo(message, routeQueueForm);
123 //	AsynchronousCall methodCall = message.getPayload().getMethodCall();
124 //	message.setMethodCall(methodCall);
125 //	adminService.handleCall(message);
126 //	KSBServiceLocator.getRouteQueueService().delete(message);
127 //
128 //	ActionMessages messages = new ActionMessages();
129 //	messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.queued"));
130 //	saveMessages(request, messages);
131 //
132 //	routeQueueForm.setMessageId(null);
133 //	routeQueueForm.setMessageQueueFromDatabase(null);
134 //	routeQueueForm.setMessageQueueFromForm(null);
135 //	routeQueueForm.setShowEdit("yes");
136 //	routeQueueForm.setMethodToCall("");
137 //	establishRequiredState(request, form);
138 //	routeQueueForm.setMessageId(message.getRouteQueueId());
139 //	routeQueueForm.setMessageQueueFromForm(message);
140 //	routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
141 //	routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(message));
142 //	return mapping.findForward("report");
143 //    }
144 
145     private PersistedMessageBO save(MessageQueueForm routeQueueForm) {
146 	Long routeQueueId = routeQueueForm.getMessageQueueFromForm().getRouteQueueId();
147 	if ((routeQueueId == null) || (routeQueueId.longValue() <= 0)) {
148 	    throw new IllegalArgumentException("Invalid routeQueueId passed in.  Cannot save");
149 	}
150 	// save the message
151 	PersistedMessageBO existingMessage = KSBServiceLocator.getMessageQueueService().findByRouteQueueId(routeQueueId);
152 	PersistedMessageBO message = routeQueueForm.getMessageQueueFromForm();
153 	// copy the new values over
154 	if (existingMessage == null) {
155 	    throw new RuntimeException("Could locate the existing message, it may have already been processed.");
156 	}
157 
158 	existingMessage.setQueuePriority(message.getQueuePriority());
159 	existingMessage.setIpNumber(message.getIpNumber());
160 	existingMessage.setLockVerNbr(message.getLockVerNbr());
161 	existingMessage.setApplicationId(message.getApplicationId());
162 	existingMessage.setMethodName(message.getMethodName());
163 	existingMessage.setQueueStatus(message.getQueueStatus());
164 	existingMessage.setRetryCount(message.getRetryCount());
165 	existingMessage.setServiceName(message.getServiceName());
166 	existingMessage.setValue1(message.getValue1());
167 	existingMessage.setValue2(message.getValue2());
168 	KSBServiceLocator.getMessageQueueService().save(existingMessage);
169 	return existingMessage;
170     }
171 
172 //    private ForwardedCallHandler getAdminServiceToForwardTo(PersistedMessageBO message, MessageQueueForm form) {
173 //	String ip = form.getIpAddress();
174 //	List<ServiceInfo> services = KSBServiceLocator.getServiceRegistry().fetchAll();
175 //	for (ServiceInfo service : services) {
176 //	    if (service.getQname().getLocalPart().equals(
177 //		    QName.valueOf(message.getServiceName()).getLocalPart() + "-forwardHandler")
178 //		    && service.getServerIp().equals(ip)) {
179 //		// retrieve a reference to the remote service
180 //		RemoteResourceServiceLocator remoteResourceLocator = KSBResourceLoaderFactory.getRemoteResourceLocator();
181 //		ForwardedCallHandler handler = (ForwardedCallHandler) remoteResourceLocator.getService(service.getQname(), service.getEndpointUrl());
182 //		if (handler != null) {
183 //		    return handler;
184 //		} else {
185 //		    LOG.warn("Failed to find forwarded call handler for service: " + service.getQname().toString() + " and endpoint URL: " + service.getEndpointUrl());
186 //		}
187 //	    }
188 //	}
189 //	throw new RuntimeException("Could not locate the BusAdminService for ip " + ip
190 //		+ " in order to forward the message.");
191 //    }
192 
193     /**
194          * Performs a quick ReQueue of the indicated persisted message.
195          *
196          * The net effect of this requeue is to set the Date to now, and to reset the RetryCount to zero. The payload is not
197          * modified.
198          *
199          * @param message
200          *                The populated message to be requeued.
201          */
202     protected void quickRequeueMessage(PersistedMessageBO message) {
203 	message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING);
204 	message.setQueueDate(new Timestamp(Calendar.getInstance().getTimeInMillis()));
205 	message.setRetryCount(new Integer(0));
206 	getRouteQueueService().save(message);
207     }
208 
209     public ActionForward quickRequeueMessage(ActionMapping mapping, ActionForm form, HttpServletRequest request,
210 	    HttpServletResponse response) throws Exception {
211 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
212 	if (routeQueueForm.getMessageQueueFromDatabase() == null) {
213 	    throw new IllegalArgumentException("No messageId passed in with the Request.");
214 	}
215 
216 	PersistedMessageBO message = routeQueueForm.getMessageQueueFromDatabase();
217 	quickRequeueMessage(message);
218 	KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
219 
220 	ActionMessages messages = new ActionMessages();
221 	messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.requeued"));
222 	saveMessages(request, messages);
223 
224 	routeQueueForm.setMessageQueueFromDatabase(null);
225 	routeQueueForm.setMessageQueueFromForm(null);
226 	routeQueueForm.setMessageId(null);
227 	routeQueueForm.setMethodToCall("");
228 
229 	// re-run the state method to load the full set of rows
230 	establishRequiredState(request, form);
231 	return mapping.findForward("report");
232     }
233 
234     public ActionForward edit(ActionMapping mapping, ActionForm form, HttpServletRequest request,
235 	    HttpServletResponse response) throws IOException, ServletException {
236 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
237 	routeQueueForm.setShowEdit("yes");
238 	routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
239 	routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
240 	routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(routeQueueForm.getMessageQueueFromForm()));
241 	return mapping.findForward("basic");
242     }
243 
244     public ActionForward view(ActionMapping mapping, ActionForm form, HttpServletRequest request,
245 	    HttpServletResponse response) throws Exception {
246 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
247 	routeQueueForm.setShowEdit("no");
248 	routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
249 	routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
250 	AsynchronousCall messagePayload = unwrapPayload(routeQueueForm.getMessageQueueFromDatabase());
251 	routeQueueForm.getMessageQueueFromForm().setMethodCall(messagePayload);
252 	return mapping.findForward("payload");
253     }
254 
255     public ActionForward reset(ActionMapping mapping, ActionForm form, HttpServletRequest request,
256 	    HttpServletResponse response) throws Exception {
257 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
258 	if (routeQueueForm.getShowEdit().equals("yes")) {
259 	    routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
260 	}
261 	return mapping.findForward("basic");
262     }
263 
264     public ActionForward clear(ActionMapping mapping, ActionForm form, HttpServletRequest request,
265 	    HttpServletResponse response) throws Exception {
266 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
267 	routeQueueForm.getMessageQueueFromForm().setQueuePriority(null);
268 	routeQueueForm.getMessageQueueFromForm().setQueueStatus(null);
269 	routeQueueForm.getMessageQueueFromForm().setQueueDate(null);
270 	routeQueueForm.getMessageQueueFromForm().setExpirationDate(null);
271 	routeQueueForm.getMessageQueueFromForm().setRetryCount(null);
272 	routeQueueForm.getMessageQueueFromForm().setIpNumber(null);
273 	routeQueueForm.getMessageQueueFromForm().setServiceName(null);
274 	routeQueueForm.getMessageQueueFromForm().setApplicationId(null);
275 	routeQueueForm.getMessageQueueFromForm().setMethodName(null);
276 	routeQueueForm.getMessageQueueFromForm().setPayload(null);
277 	routeQueueForm.getMessageQueueFromForm().setMethodCall(null);
278 	routeQueueForm.setExistingQueueDate(null);
279 	routeQueueForm.setNewQueueDate(null);
280 	return mapping.findForward("basic");
281     }
282 
283     public ActionForward delete(ActionMapping mapping, ActionForm form, HttpServletRequest request,
284 	    HttpServletResponse response) throws Exception {
285 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
286 	routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
287 	routeQueueForm.setMessageQueueFromDatabase(null);
288 	getRouteQueueService().delete(routeQueueForm.getMessageQueueFromForm());
289 	ActionMessages messages = new ActionMessages();
290 	messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.deleted", routeQueueForm
291 		.getMessageQueueFromForm().getRouteQueueId().toString()));
292 	saveMessages(request, messages);
293 	routeQueueForm.setMessageId(null);
294 	establishRequiredState(request, form);
295 	return mapping.findForward("report");
296     }
297 
298     public ActionForward executeMessageFetcher(ActionMapping mapping,  ActionForm form, HttpServletRequest request,
299 	    HttpServletResponse response) throws Exception {
300 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
301 	ActionMessages messages = new ActionMessages();
302 	if (routeQueueForm.getMaxMessageFetcherMessages() == null || routeQueueForm.getMaxMessageFetcherMessages() <= 0) {
303 	    messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.invalidMessages", routeQueueForm.getMaxMessageFetcherMessages()));
304 	}
305 	if (!messages.isEmpty()) {
306 	    saveMessages(request, messages);
307 	    return mapping.findForward("report");
308 	}
309 	new MessageFetcher(routeQueueForm.getMaxMessageFetcherMessages()).run();
310 	return mapping.findForward("report");
311     }
312 
313     /**
314          * Sets up the expected state by retrieving the selected RouteQueue by RouteQueueId, and placing it in the
315          * ExistingRouteQueue member.
316          *
317          * Called by the super's Execute method on every request.
318          */
319     @Override
320 	public ActionMessages establishRequiredState(HttpServletRequest request, ActionForm form) throws Exception {
321 	request.setAttribute("rice_constant", getServlet().getServletContext().getAttribute("RiceConstants"));
322 	request.setAttribute("ksb_constant", getServlet().getServletContext().getAttribute("KSBConstants"));
323 	MessageQueueForm routeQueueForm = (MessageQueueForm) form;
324 	routeQueueForm.setMyIpAddress(RiceUtilities.getIpNumber());
325 	routeQueueForm.setMyApplicationId(CoreConfigHelper.getApplicationId());
326 	routeQueueForm.setMessagePersistence(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_PERSISTENCE));
327 	routeQueueForm.setMessageDelivery(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY));
328 	routeQueueForm.setMessageOff(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGING_OFF));
329 	List<ServiceInfo> services = KsbApiServiceLocator.getServiceRegistry().getAllOnlineServices();
330 	if (routeQueueForm.getMessageId() != null) {
331 	    PersistedMessageBO rq = getRouteQueueService().findByRouteQueueId(routeQueueForm.getMessageId());
332 	    if (rq != null) {
333 		routeQueueForm.setExistingQueueDate(RiceConstants.getDefaultDateFormat().format(new Date()));
334 		routeQueueForm.setMessageQueueFromDatabase(rq);
335 		// establish IP addresses where this message could safely be forwarded to
336 		String serviceName = rq.getServiceName();
337 		for (ServiceInfo serviceInfo : services) {
338 		    if (serviceInfo.getServiceName().equals(serviceName)) {
339 			routeQueueForm.getIpAddresses().add(
340 				new ConcreteKeyValue(serviceInfo.getServerIpAddress(), serviceInfo.getServerIpAddress()));
341 		    }
342 		}
343 	    } else {
344 		ActionMessages messages = new ActionMessages();
345 		messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage(
346 			"messagequeue.RouteQueueService.queuedDocumentNotFound", routeQueueForm.getMessageId().toString()));
347 		return messages;
348 	    }
349 	    routeQueueForm.setMessageId(null);
350 	} else if (!"clear".equalsIgnoreCase(request.getParameter("methodToCall"))) {
351 	    List<PersistedMessageBO> queueEntries = findRouteQueues(request, routeQueueForm, routeQueueForm.getMaxRows() + 1);
352 	    if (queueEntries.size() > 0) {
353 		Collections.sort(queueEntries, new Comparator() {
354 		    private Comparator comp = new ComparableComparator();
355 
356 		    @Override
357 			public int compare(Object object1, Object object2) {
358 			if (object1 == null && object2 == null) {
359 			    return 0;
360 			} else if (object1 == null) {
361 			    return 1;
362 			} else if (object2 == null) {
363 			    return -1;
364 			}
365 			Long id1 = ((PersistedMessageBO) object1).getRouteQueueId();
366 			Long id2 = ((PersistedMessageBO) object2).getRouteQueueId();
367 
368 			try {
369 			    return this.comp.compare(id1, id2);
370 			} catch (Exception e) {
371 			    return 0;
372 			}
373 		    }
374 		});
375 	    }
376 	    routeQueueForm.setMessageQueueRows(queueEntries);
377 	}
378 	return null;
379     }
380 
381     protected List<PersistedMessageBO> findRouteQueues(HttpServletRequest request, MessageQueueForm routeQueueForm, int maxRows) {
382 	List<PersistedMessageBO> routeQueues = new ArrayList<PersistedMessageBO>();
383 
384 	// no filter applied
385 	if (StringUtils.isBlank(routeQueueForm.getFilterApplied())) {
386 	    routeQueues.addAll(getRouteQueueService().findAll(maxRows));
387 	}
388 
389 	// one or more filters applied
390 	else {
391 	    if (!StringUtils.isBlank(routeQueueForm.getRouteQueueIdFilter())) {
392 		if (!NumberUtils.isNumber(routeQueueForm.getRouteQueueIdFilter())) {
393 		    // TODO better error handling here
394 		    throw new RuntimeException("Message Id must be a number.");
395 		}
396 	    }
397 
398 	    Map<String, String> criteriaValues = new HashMap<String, String>();
399 	    String key = null;
400 	    String value = null;
401 	    String trimmedKey = null;
402 	    for (Iterator iter = request.getParameterMap().keySet().iterator(); iter.hasNext();) {
403 		key = (String) iter.next();
404 		if (key.endsWith(KSBConstants.ROUTE_QUEUE_FILTER_SUFFIX)) {
405 		    value = request.getParameter(key);
406 		    if (StringUtils.isNotBlank(value)) {
407 			trimmedKey = key.substring(0, key.indexOf(KSBConstants.ROUTE_QUEUE_FILTER_SUFFIX));
408 			criteriaValues.put(trimmedKey, value);
409 		    }
410 		}
411 	    }
412 	    routeQueues.addAll(getRouteQueueService().findByValues(criteriaValues, maxRows));
413 	}
414 	return routeQueues;
415     }
416 
417     private MessageQueueService getRouteQueueService() {
418 	return KSBServiceLocator.getMessageQueueService();
419     }
420 
421     /**
422          * Extracts the payload from a PersistedMessageBO, attempts to convert it to the expected AsynchronousCall type, and
423          * returns it.
424          *
425          * Throws an IllegalArgumentException if the decoded payload isnt of the expected type.
426          *
427          * @param message
428          *                The populated PersistedMessageBO object to extract the payload from.
429          * @return Returns the payload if one is present and it can be deserialized, otherwise returns null.
430          */
431     protected AsynchronousCall unwrapPayload(PersistedMessageBO message) {
432 	if (message == null || message.getPayload() == null) {
433 	    return null;
434 	}
435 	String encodedPayload = message.getPayload().getPayload();
436 	if (StringUtils.isBlank(encodedPayload)) {
437 	    return null;
438 	}
439 	Object decodedPayload = null;
440 	if (encodedPayload != null) {
441 	    decodedPayload = SerializationUtils.deserializeFromBase64(encodedPayload);
442 	}
443 	// fail fast if its not the expected type of AsynchronousCall
444 	if ((decodedPayload != null) && !(decodedPayload instanceof AsynchronousCall)) {
445 	    throw new IllegalArgumentException("PersistedMessageBO payload was not of the expected class. " + "Expected was ["
446 		    + AsynchronousCall.class.getName() + "], actual was: [" + decodedPayload.getClass().getName() + "]");
447 	}
448 	return (AsynchronousCall) decodedPayload;
449     }
450 
451 }