1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.kuali.rice.ksb.messaging.web;
15
16 import java.io.IOException;
17 import java.sql.Timestamp;
18 import java.util.ArrayList;
19 import java.util.Calendar;
20 import java.util.Collections;
21 import java.util.Comparator;
22 import java.util.Date;
23 import java.util.HashMap;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.Map;
27
28 import javax.servlet.ServletException;
29 import javax.servlet.http.HttpServletRequest;
30 import javax.servlet.http.HttpServletResponse;
31 import javax.xml.namespace.QName;
32
33 import org.apache.commons.collections.comparators.ComparableComparator;
34 import org.apache.commons.lang.StringUtils;
35 import org.apache.commons.lang.math.NumberUtils;
36 import org.apache.struts.action.ActionForm;
37 import org.apache.struts.action.ActionForward;
38 import org.apache.struts.action.ActionMapping;
39 import org.apache.struts.action.ActionMessage;
40 import org.apache.struts.action.ActionMessages;
41 import org.kuali.rice.core.config.ConfigContext;
42 import org.kuali.rice.core.util.JSTLConstants;
43 import org.kuali.rice.core.util.RiceConstants;
44 import org.kuali.rice.core.util.RiceUtilities;
45 import org.kuali.rice.ksb.messaging.AsynchronousCall;
46 import org.kuali.rice.ksb.messaging.MessageFetcher;
47 import org.kuali.rice.ksb.messaging.MessageServiceInvoker;
48 import org.kuali.rice.ksb.messaging.PersistedMessage;
49 import org.kuali.rice.ksb.messaging.RemoteResourceServiceLocator;
50 import org.kuali.rice.ksb.messaging.ServiceInfo;
51 import org.kuali.rice.ksb.messaging.callforwarding.ForwardedCallHandler;
52 import org.kuali.rice.ksb.messaging.resourceloader.KSBResourceLoaderFactory;
53 import org.kuali.rice.ksb.messaging.service.MessageQueueService;
54 import org.kuali.rice.ksb.service.KSBServiceLocator;
55 import org.kuali.rice.ksb.util.KSBConstants;
56
57
58
59
60
61
62
63 public class MessageQueueAction extends KSBAction {
64
65 private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger.getLogger(MessageQueueAction.class);
66
67 public ActionForward start(ActionMapping mapping, ActionForm form, HttpServletRequest request,
68 HttpServletResponse response) throws IOException, ServletException {
69 return mapping.findForward("report");
70 }
71
72 public ActionForward save(ActionMapping mapping, ActionForm form, HttpServletRequest request,
73 HttpServletResponse response) throws Exception {
74 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
75 save(routeQueueForm);
76
77 Long routeQueueId = routeQueueForm.getMessageQueueFromForm().getRouteQueueId();
78 ActionMessages messages = new ActionMessages();
79 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.saved"));
80 saveMessages(request, messages);
81
82
83
84
85
86
87
88
89
90
91
92 return mapping.findForward("report");
93 }
94
95 public ActionForward saveAndResubmit(ActionMapping mapping, ActionForm form, HttpServletRequest request,
96 HttpServletResponse response) throws Exception {
97 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
98 PersistedMessage message = save(routeQueueForm);
99 KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
100
101 ActionMessages messages = new ActionMessages();
102 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.queued"));
103 saveMessages(request, messages);
104
105 routeQueueForm.setMessageId(null);
106 routeQueueForm.setMessageQueueFromDatabase(null);
107 routeQueueForm.setMessageQueueFromForm(null);
108 routeQueueForm.setShowEdit("yes");
109 routeQueueForm.setMethodToCall("");
110 establishRequiredState(request, form);
111 routeQueueForm.setMessageId(message.getRouteQueueId());
112 routeQueueForm.setMessageQueueFromForm(message);
113 routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
114 routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(message));
115 return mapping.findForward("report");
116 }
117
118 public ActionForward saveAndForward(ActionMapping mapping, ActionForm form, HttpServletRequest request,
119 HttpServletResponse response) throws Exception {
120 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
121 PersistedMessage message = save(routeQueueForm);
122 ForwardedCallHandler adminService = getAdminServiceToForwardTo(message, routeQueueForm);
123 AsynchronousCall methodCall = message.getPayload().getMethodCall();
124 message.setMethodCall(methodCall);
125 adminService.handleCall(message);
126 KSBServiceLocator.getRouteQueueService().delete(message);
127
128 ActionMessages messages = new ActionMessages();
129 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.queued"));
130 saveMessages(request, messages);
131
132 routeQueueForm.setMessageId(null);
133 routeQueueForm.setMessageQueueFromDatabase(null);
134 routeQueueForm.setMessageQueueFromForm(null);
135 routeQueueForm.setShowEdit("yes");
136 routeQueueForm.setMethodToCall("");
137 establishRequiredState(request, form);
138 routeQueueForm.setMessageId(message.getRouteQueueId());
139 routeQueueForm.setMessageQueueFromForm(message);
140 routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
141 routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(message));
142 return mapping.findForward("report");
143 }
144
145 private PersistedMessage save(MessageQueueForm routeQueueForm) {
146 Long routeQueueId = routeQueueForm.getMessageQueueFromForm().getRouteQueueId();
147 if ((routeQueueId == null) || (routeQueueId.longValue() <= 0)) {
148 throw new IllegalArgumentException("Invalid routeQueueId passed in. Cannot save");
149 }
150
151 PersistedMessage existingMessage = KSBServiceLocator.getRouteQueueService().findByRouteQueueId(routeQueueId);
152 PersistedMessage message = routeQueueForm.getMessageQueueFromForm();
153
154 if (existingMessage == null) {
155 throw new RuntimeException("Could locate the existing message, it may have already been processed.");
156 }
157
158 existingMessage.setQueuePriority(message.getQueuePriority());
159 existingMessage.setIpNumber(message.getIpNumber());
160 existingMessage.setLockVerNbr(message.getLockVerNbr());
161 existingMessage.setServiceNamespace(message.getServiceNamespace());
162 existingMessage.setMethodName(message.getMethodName());
163 existingMessage.setQueueStatus(message.getQueueStatus());
164 existingMessage.setRetryCount(message.getRetryCount());
165 existingMessage.setServiceName(message.getServiceName());
166 existingMessage.setValue1(message.getValue1());
167 existingMessage.setValue2(message.getValue2());
168 KSBServiceLocator.getRouteQueueService().save(existingMessage);
169 return existingMessage;
170 }
171
172 private ForwardedCallHandler getAdminServiceToForwardTo(PersistedMessage message, MessageQueueForm form) {
173 String ip = form.getIpAddress();
174 List<ServiceInfo> services = KSBServiceLocator.getServiceRegistry().fetchAll();
175 for (ServiceInfo service : services) {
176 if (service.getQname().getLocalPart().equals(
177 QName.valueOf(message.getServiceName()).getLocalPart() + "-forwardHandler")
178 && service.getServerIp().equals(ip)) {
179
180 RemoteResourceServiceLocator remoteResourceLocator = KSBResourceLoaderFactory.getRemoteResourceLocator();
181 ForwardedCallHandler handler = (ForwardedCallHandler) remoteResourceLocator.getService(service.getQname(), service.getEndpointUrl());
182 if (handler != null) {
183 return handler;
184 } else {
185 LOG.warn("Failed to find forwarded call handler for service: " + service.getQname().toString() + " and endpoint URL: " + service.getEndpointUrl());
186 }
187 }
188 }
189 throw new RuntimeException("Could not locate the BusAdminService for ip " + ip
190 + " in order to forward the message.");
191 }
192
193
194
195
196
197
198
199
200
201
202 protected void quickRequeueMessage(PersistedMessage message) {
203 message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING);
204 message.setQueueDate(new Timestamp(Calendar.getInstance().getTimeInMillis()));
205 message.setRetryCount(new Integer(0));
206 getRouteQueueService().save(message);
207 }
208
209 public ActionForward quickRequeueMessage(ActionMapping mapping, ActionForm form, HttpServletRequest request,
210 HttpServletResponse response) throws Exception {
211 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
212 if (routeQueueForm.getMessageQueueFromDatabase() == null) {
213 throw new IllegalArgumentException("No messageId passed in with the Request.");
214 }
215
216 PersistedMessage message = routeQueueForm.getMessageQueueFromDatabase();
217 quickRequeueMessage(message);
218 KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
219
220 ActionMessages messages = new ActionMessages();
221 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.requeued"));
222 saveMessages(request, messages);
223
224 routeQueueForm.setMessageQueueFromDatabase(null);
225 routeQueueForm.setMessageQueueFromForm(null);
226 routeQueueForm.setMessageId(null);
227 routeQueueForm.setMethodToCall("");
228
229
230 establishRequiredState(request, form);
231 return mapping.findForward("report");
232 }
233
234 public ActionForward edit(ActionMapping mapping, ActionForm form, HttpServletRequest request,
235 HttpServletResponse response) throws IOException, ServletException {
236 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
237 routeQueueForm.setShowEdit("yes");
238 routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
239 routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
240 routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(routeQueueForm.getMessageQueueFromForm()));
241 return mapping.findForward("basic");
242 }
243
244 public ActionForward view(ActionMapping mapping, ActionForm form, HttpServletRequest request,
245 HttpServletResponse response) throws Exception {
246 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
247 routeQueueForm.setShowEdit("no");
248 routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
249 routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
250 AsynchronousCall messagePayload = unwrapPayload(routeQueueForm.getMessageQueueFromDatabase());
251 routeQueueForm.getMessageQueueFromForm().setMethodCall(messagePayload);
252 return mapping.findForward("payload");
253 }
254
255 public ActionForward reset(ActionMapping mapping, ActionForm form, HttpServletRequest request,
256 HttpServletResponse response) throws Exception {
257 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
258 if (routeQueueForm.getShowEdit().equals("yes")) {
259 routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
260 }
261 return mapping.findForward("basic");
262 }
263
264 public ActionForward clear(ActionMapping mapping, ActionForm form, HttpServletRequest request,
265 HttpServletResponse response) throws Exception {
266 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
267 routeQueueForm.getMessageQueueFromForm().setQueuePriority(null);
268 routeQueueForm.getMessageQueueFromForm().setQueueStatus(null);
269 routeQueueForm.getMessageQueueFromForm().setQueueDate(null);
270 routeQueueForm.getMessageQueueFromForm().setExpirationDate(null);
271 routeQueueForm.getMessageQueueFromForm().setRetryCount(null);
272 routeQueueForm.getMessageQueueFromForm().setIpNumber(null);
273 routeQueueForm.getMessageQueueFromForm().setServiceName(null);
274 routeQueueForm.getMessageQueueFromForm().setServiceNamespace(null);
275 routeQueueForm.getMessageQueueFromForm().setMethodName(null);
276 routeQueueForm.getMessageQueueFromForm().setPayload(null);
277 routeQueueForm.getMessageQueueFromForm().setMethodCall(null);
278 routeQueueForm.setExistingQueueDate(null);
279 routeQueueForm.setNewQueueDate(null);
280 return mapping.findForward("basic");
281 }
282
283 public ActionForward delete(ActionMapping mapping, ActionForm form, HttpServletRequest request,
284 HttpServletResponse response) throws Exception {
285 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
286 routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
287 routeQueueForm.setMessageQueueFromDatabase(null);
288 getRouteQueueService().delete(routeQueueForm.getMessageQueueFromForm());
289 ActionMessages messages = new ActionMessages();
290 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.deleted", routeQueueForm
291 .getMessageQueueFromForm().getRouteQueueId().toString()));
292 saveMessages(request, messages);
293 routeQueueForm.setMessageId(null);
294 establishRequiredState(request, form);
295 return mapping.findForward("report");
296 }
297
298 public ActionForward executeMessageFetcher(ActionMapping mapping, ActionForm form, HttpServletRequest request,
299 HttpServletResponse response) throws Exception {
300 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
301 ActionMessages messages = new ActionMessages();
302 if (routeQueueForm.getMaxMessageFetcherMessages() == null || routeQueueForm.getMaxMessageFetcherMessages() <= 0) {
303 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.invalidMessages", routeQueueForm.getMaxMessageFetcherMessages()));
304 }
305 if (!messages.isEmpty()) {
306 saveMessages(request, messages);
307 return mapping.findForward("report");
308 }
309 new MessageFetcher(routeQueueForm.getMaxMessageFetcherMessages()).run();
310 return mapping.findForward("report");
311 }
312
313
314
315
316
317
318
319 public ActionMessages establishRequiredState(HttpServletRequest request, ActionForm form) throws Exception {
320 request.setAttribute("rice_constant", new JSTLConstants(RiceConstants.class));
321 request.setAttribute("ksb_constant", new KSBConstants());
322 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
323 routeQueueForm.setMyIpAddress(RiceUtilities.getIpNumber());
324 routeQueueForm.setMyServiceNamespace(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.SERVICE_NAMESPACE));
325 routeQueueForm.setMessagePersistence(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.MESSAGE_PERSISTENCE));
326 routeQueueForm.setMessageDelivery(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.MESSAGE_DELIVERY));
327 routeQueueForm.setMessageOff(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.MESSAGING_OFF));
328 List<ServiceInfo> services = KSBServiceLocator.getServiceRegistry().fetchAll();
329 if (routeQueueForm.getMessageId() != null) {
330 PersistedMessage rq = getRouteQueueService().findByRouteQueueId(routeQueueForm.getMessageId());
331 if (rq != null) {
332 routeQueueForm.setExistingQueueDate(RiceConstants.getDefaultDateFormat().format(new Date()));
333 routeQueueForm.setMessageQueueFromDatabase(rq);
334
335 String serviceName = rq.getServiceName();
336 for (ServiceInfo serviceInfo : services) {
337 if (serviceInfo.getServiceName().equals(serviceName)) {
338 routeQueueForm.getIpAddresses().add(
339 new ValueLabelPair(serviceInfo.getServerIp(), serviceInfo.getServerIp()));
340 }
341 }
342 } else {
343 ActionMessages messages = new ActionMessages();
344 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage(
345 "messagequeue.RouteQueueService.queuedDocumentNotFound", routeQueueForm.getMessageId().toString()));
346 return messages;
347 }
348 routeQueueForm.setMessageId(null);
349 } else if (!"clear".equalsIgnoreCase(request.getParameter("methodToCall"))) {
350 List<PersistedMessage> queueEntries = findRouteQueues(request, routeQueueForm, routeQueueForm.getMaxRows() + 1);
351 if (queueEntries.size() > 0) {
352 Collections.sort(queueEntries, new Comparator() {
353 private Comparator comp = new ComparableComparator();
354
355 public int compare(Object object1, Object object2) {
356 if (object1 == null && object2 == null) {
357 return 0;
358 } else if (object1 == null) {
359 return 1;
360 } else if (object2 == null) {
361 return -1;
362 }
363 Long id1 = ((PersistedMessage) object1).getRouteQueueId();
364 Long id2 = ((PersistedMessage) object2).getRouteQueueId();
365
366 try {
367 return this.comp.compare(id1, id2);
368 } catch (Exception e) {
369 return 0;
370 }
371 }
372 });
373 }
374 routeQueueForm.setMessageQueueRows(queueEntries);
375 }
376 return null;
377 }
378
379 protected List<PersistedMessage> findRouteQueues(HttpServletRequest request, MessageQueueForm routeQueueForm, int maxRows) {
380 List<PersistedMessage> routeQueues = new ArrayList<PersistedMessage>();
381
382
383 if (StringUtils.isBlank(routeQueueForm.getFilterApplied())) {
384 routeQueues.addAll(getRouteQueueService().findAll(maxRows));
385 }
386
387
388 else {
389 if (!StringUtils.isBlank(routeQueueForm.getRouteQueueIdFilter())) {
390 if (!NumberUtils.isNumber(routeQueueForm.getRouteQueueIdFilter())) {
391
392 throw new RuntimeException("Message Id must be a number.");
393 }
394 }
395
396 Map<String, String> criteriaValues = new HashMap<String, String>();
397 String key = null;
398 String value = null;
399 String trimmedKey = null;
400 for (Iterator iter = request.getParameterMap().keySet().iterator(); iter.hasNext();) {
401 key = (String) iter.next();
402 if (key.endsWith(KSBConstants.ROUTE_QUEUE_FILTER_SUFFIX)) {
403 value = request.getParameter(key);
404 if (StringUtils.isNotBlank(value)) {
405 trimmedKey = key.substring(0, key.indexOf(KSBConstants.ROUTE_QUEUE_FILTER_SUFFIX));
406 criteriaValues.put(trimmedKey, value);
407 }
408 }
409 }
410 routeQueues.addAll(getRouteQueueService().findByValues(criteriaValues, maxRows));
411 }
412 return routeQueues;
413 }
414
415 private MessageQueueService getRouteQueueService() {
416 return KSBServiceLocator.getRouteQueueService();
417 }
418
419
420
421
422
423
424
425
426
427
428
429 protected AsynchronousCall unwrapPayload(PersistedMessage message) {
430 if (message == null || message.getPayload() == null) {
431 return null;
432 }
433 String encodedPayload = message.getPayload().getPayload();
434 if (StringUtils.isBlank(encodedPayload)) {
435 return null;
436 }
437 Object decodedPayload = null;
438 if (encodedPayload != null) {
439 decodedPayload = KSBServiceLocator.getMessageHelper().deserializeObject(encodedPayload);
440 }
441
442 if ((decodedPayload != null) && !(decodedPayload instanceof AsynchronousCall)) {
443 throw new IllegalArgumentException("PersistedMessage payload was not of the expected class. " + "Expected was ["
444 + AsynchronousCall.class.getName() + "], actual was: [" + decodedPayload.getClass().getName() + "]");
445 }
446 return (AsynchronousCall) decodedPayload;
447 }
448
449 }