1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.kuali.rice.ksb.messaging.web;
18
19 import org.apache.commons.collections.comparators.ComparableComparator;
20 import org.apache.commons.lang.StringUtils;
21 import org.apache.commons.lang.math.NumberUtils;
22 import org.apache.struts.action.ActionForm;
23 import org.apache.struts.action.ActionForward;
24 import org.apache.struts.action.ActionMapping;
25 import org.apache.struts.action.ActionMessage;
26 import org.apache.struts.action.ActionMessages;
27 import org.kuali.rice.core.api.config.CoreConfigHelper;
28 import org.kuali.rice.core.api.config.property.ConfigContext;
29 import org.kuali.rice.core.api.util.ConcreteKeyValue;
30 import org.kuali.rice.core.api.util.RiceConstants;
31 import org.kuali.rice.core.api.util.RiceUtilities;
32 import org.kuali.rice.core.api.util.io.SerializationUtils;
33 import org.kuali.rice.ksb.api.KsbApiServiceLocator;
34 import org.kuali.rice.ksb.api.messaging.AsynchronousCall;
35 import org.kuali.rice.ksb.api.registry.ServiceInfo;
36 import org.kuali.rice.ksb.messaging.MessageFetcher;
37 import org.kuali.rice.ksb.messaging.MessageServiceInvoker;
38 import org.kuali.rice.ksb.messaging.PersistedMessageBO;
39 import org.kuali.rice.ksb.messaging.service.MessageQueueService;
40 import org.kuali.rice.ksb.service.KSBServiceLocator;
41 import org.kuali.rice.ksb.util.KSBConstants;
42
43 import javax.servlet.ServletException;
44 import javax.servlet.http.HttpServletRequest;
45 import javax.servlet.http.HttpServletResponse;
46 import java.io.IOException;
47 import java.sql.Timestamp;
48 import java.util.ArrayList;
49 import java.util.Calendar;
50 import java.util.Collections;
51 import java.util.Comparator;
52 import java.util.Date;
53 import java.util.HashMap;
54 import java.util.Iterator;
55 import java.util.List;
56 import java.util.Map;
57
58
59
60
61
62
63
64 public class MessageQueueAction extends KSBAction {
65
66 @Override
67 public ActionForward start(ActionMapping mapping, ActionForm form, HttpServletRequest request,
68 HttpServletResponse response) throws IOException, ServletException {
69 return mapping.findForward("report");
70 }
71
72 public ActionForward save(ActionMapping mapping, ActionForm form, HttpServletRequest request,
73 HttpServletResponse response) throws Exception {
74 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
75 save(routeQueueForm);
76
77 routeQueueForm.getMessageQueueFromForm().getRouteQueueId();
78 ActionMessages messages = new ActionMessages();
79 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.saved"));
80 saveMessages(request, messages);
81
82
83
84
85
86
87
88
89
90
91
92 return mapping.findForward("report");
93 }
94
95 public ActionForward saveAndResubmit(ActionMapping mapping, ActionForm form, HttpServletRequest request,
96 HttpServletResponse response) throws Exception {
97 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
98 PersistedMessageBO message = save(routeQueueForm);
99 KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
100
101 ActionMessages messages = new ActionMessages();
102 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.queued"));
103 saveMessages(request, messages);
104
105 routeQueueForm.setMessageId(null);
106 routeQueueForm.setMessageQueueFromDatabase(null);
107 routeQueueForm.setMessageQueueFromForm(null);
108 routeQueueForm.setShowEdit("yes");
109 routeQueueForm.setMethodToCall("");
110 establishRequiredState(request, form);
111 routeQueueForm.setMessageId(message.getRouteQueueId());
112 routeQueueForm.setMessageQueueFromForm(message);
113 routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
114 routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(message));
115 return mapping.findForward("report");
116 }
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145 private PersistedMessageBO save(MessageQueueForm routeQueueForm) {
146 Long routeQueueId = routeQueueForm.getMessageQueueFromForm().getRouteQueueId();
147 if ((routeQueueId == null) || (routeQueueId.longValue() <= 0)) {
148 throw new IllegalArgumentException("Invalid routeQueueId passed in. Cannot save");
149 }
150
151 PersistedMessageBO existingMessage = KSBServiceLocator.getMessageQueueService().findByRouteQueueId(routeQueueId);
152 PersistedMessageBO message = routeQueueForm.getMessageQueueFromForm();
153
154 if (existingMessage == null) {
155 throw new RuntimeException("Could locate the existing message, it may have already been processed.");
156 }
157
158 existingMessage.setQueuePriority(message.getQueuePriority());
159 existingMessage.setIpNumber(message.getIpNumber());
160 existingMessage.setLockVerNbr(message.getLockVerNbr());
161 existingMessage.setApplicationId(message.getApplicationId());
162 existingMessage.setMethodName(message.getMethodName());
163 existingMessage.setQueueStatus(message.getQueueStatus());
164 existingMessage.setRetryCount(message.getRetryCount());
165 existingMessage.setServiceName(message.getServiceName());
166 existingMessage.setValue1(message.getValue1());
167 existingMessage.setValue2(message.getValue2());
168 KSBServiceLocator.getMessageQueueService().save(existingMessage);
169 return existingMessage;
170 }
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202 protected void quickRequeueMessage(PersistedMessageBO message) {
203 message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING);
204 message.setQueueDate(new Timestamp(Calendar.getInstance().getTimeInMillis()));
205 message.setRetryCount(new Integer(0));
206 getRouteQueueService().save(message);
207 }
208
209 public ActionForward quickRequeueMessage(ActionMapping mapping, ActionForm form, HttpServletRequest request,
210 HttpServletResponse response) throws Exception {
211 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
212 if (routeQueueForm.getMessageQueueFromDatabase() == null) {
213 throw new IllegalArgumentException("No messageId passed in with the Request.");
214 }
215
216 PersistedMessageBO message = routeQueueForm.getMessageQueueFromDatabase();
217 quickRequeueMessage(message);
218 KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
219
220 ActionMessages messages = new ActionMessages();
221 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.requeued"));
222 saveMessages(request, messages);
223
224 routeQueueForm.setMessageQueueFromDatabase(null);
225 routeQueueForm.setMessageQueueFromForm(null);
226 routeQueueForm.setMessageId(null);
227 routeQueueForm.setMethodToCall("");
228
229
230 establishRequiredState(request, form);
231 return mapping.findForward("report");
232 }
233
234 public ActionForward edit(ActionMapping mapping, ActionForm form, HttpServletRequest request,
235 HttpServletResponse response) throws IOException, ServletException {
236 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
237 routeQueueForm.setShowEdit("yes");
238 routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
239 routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
240 routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(routeQueueForm.getMessageQueueFromForm()));
241 return mapping.findForward("basic");
242 }
243
244 public ActionForward view(ActionMapping mapping, ActionForm form, HttpServletRequest request,
245 HttpServletResponse response) throws Exception {
246 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
247 routeQueueForm.setShowEdit("no");
248 routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
249 routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
250 AsynchronousCall messagePayload = unwrapPayload(routeQueueForm.getMessageQueueFromDatabase());
251 routeQueueForm.getMessageQueueFromForm().setMethodCall(messagePayload);
252 return mapping.findForward("payload");
253 }
254
255 public ActionForward reset(ActionMapping mapping, ActionForm form, HttpServletRequest request,
256 HttpServletResponse response) throws Exception {
257 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
258 if (routeQueueForm.getShowEdit().equals("yes")) {
259 routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
260 }
261 return mapping.findForward("basic");
262 }
263
264 public ActionForward clear(ActionMapping mapping, ActionForm form, HttpServletRequest request,
265 HttpServletResponse response) throws Exception {
266 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
267 routeQueueForm.getMessageQueueFromForm().setQueuePriority(null);
268 routeQueueForm.getMessageQueueFromForm().setQueueStatus(null);
269 routeQueueForm.getMessageQueueFromForm().setQueueDate(null);
270 routeQueueForm.getMessageQueueFromForm().setExpirationDate(null);
271 routeQueueForm.getMessageQueueFromForm().setRetryCount(null);
272 routeQueueForm.getMessageQueueFromForm().setIpNumber(null);
273 routeQueueForm.getMessageQueueFromForm().setServiceName(null);
274 routeQueueForm.getMessageQueueFromForm().setApplicationId(null);
275 routeQueueForm.getMessageQueueFromForm().setMethodName(null);
276 routeQueueForm.getMessageQueueFromForm().setPayload(null);
277 routeQueueForm.getMessageQueueFromForm().setMethodCall(null);
278 routeQueueForm.setExistingQueueDate(null);
279 routeQueueForm.setNewQueueDate(null);
280 return mapping.findForward("basic");
281 }
282
283 public ActionForward delete(ActionMapping mapping, ActionForm form, HttpServletRequest request,
284 HttpServletResponse response) throws Exception {
285 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
286 routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
287 routeQueueForm.setMessageQueueFromDatabase(null);
288 getRouteQueueService().delete(routeQueueForm.getMessageQueueFromForm());
289 ActionMessages messages = new ActionMessages();
290 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.deleted", routeQueueForm
291 .getMessageQueueFromForm().getRouteQueueId().toString()));
292 saveMessages(request, messages);
293 routeQueueForm.setMessageId(null);
294 establishRequiredState(request, form);
295 return mapping.findForward("report");
296 }
297
298 public ActionForward executeMessageFetcher(ActionMapping mapping, ActionForm form, HttpServletRequest request,
299 HttpServletResponse response) throws Exception {
300 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
301 ActionMessages messages = new ActionMessages();
302 if (routeQueueForm.getMaxMessageFetcherMessages() == null || routeQueueForm.getMaxMessageFetcherMessages() <= 0) {
303 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.invalidMessages", routeQueueForm.getMaxMessageFetcherMessages()));
304 }
305 if (!messages.isEmpty()) {
306 saveMessages(request, messages);
307 return mapping.findForward("report");
308 }
309 new MessageFetcher(routeQueueForm.getMaxMessageFetcherMessages()).run();
310 return mapping.findForward("report");
311 }
312
313
314
315
316
317
318
319 @Override
320 public ActionMessages establishRequiredState(HttpServletRequest request, ActionForm form) throws Exception {
321 request.setAttribute("rice_constant", getServlet().getServletContext().getAttribute("RiceConstants"));
322 request.setAttribute("ksb_constant", getServlet().getServletContext().getAttribute("KSBConstants"));
323 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
324 routeQueueForm.setMyIpAddress(RiceUtilities.getIpNumber());
325 routeQueueForm.setMyApplicationId(CoreConfigHelper.getApplicationId());
326 routeQueueForm.setMessagePersistence(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_PERSISTENCE));
327 routeQueueForm.setMessageDelivery(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY));
328 routeQueueForm.setMessageOff(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGING_OFF));
329 List<ServiceInfo> services = KsbApiServiceLocator.getServiceRegistry().getAllOnlineServices();
330 if (routeQueueForm.getMessageId() != null) {
331 PersistedMessageBO rq = getRouteQueueService().findByRouteQueueId(routeQueueForm.getMessageId());
332 if (rq != null) {
333 routeQueueForm.setExistingQueueDate(RiceConstants.getDefaultDateFormat().format(new Date()));
334 routeQueueForm.setMessageQueueFromDatabase(rq);
335
336 String serviceName = rq.getServiceName();
337 for (ServiceInfo serviceInfo : services) {
338 if (serviceInfo.getServiceName().equals(serviceName)) {
339 routeQueueForm.getIpAddresses().add(
340 new ConcreteKeyValue(serviceInfo.getServerIpAddress(), serviceInfo.getServerIpAddress()));
341 }
342 }
343 } else {
344 ActionMessages messages = new ActionMessages();
345 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage(
346 "messagequeue.RouteQueueService.queuedDocumentNotFound", routeQueueForm.getMessageId().toString()));
347 return messages;
348 }
349 routeQueueForm.setMessageId(null);
350 } else if (!"clear".equalsIgnoreCase(request.getParameter("methodToCall"))) {
351 List<PersistedMessageBO> queueEntries = findRouteQueues(request, routeQueueForm, routeQueueForm.getMaxRows() + 1);
352 if (queueEntries.size() > 0) {
353 Collections.sort(queueEntries, new Comparator() {
354 private Comparator comp = new ComparableComparator();
355
356 @Override
357 public int compare(Object object1, Object object2) {
358 if (object1 == null && object2 == null) {
359 return 0;
360 } else if (object1 == null) {
361 return 1;
362 } else if (object2 == null) {
363 return -1;
364 }
365 Long id1 = ((PersistedMessageBO) object1).getRouteQueueId();
366 Long id2 = ((PersistedMessageBO) object2).getRouteQueueId();
367
368 try {
369 return this.comp.compare(id1, id2);
370 } catch (Exception e) {
371 return 0;
372 }
373 }
374 });
375 }
376 routeQueueForm.setMessageQueueRows(queueEntries);
377 }
378 return null;
379 }
380
381 protected List<PersistedMessageBO> findRouteQueues(HttpServletRequest request, MessageQueueForm routeQueueForm, int maxRows) {
382 List<PersistedMessageBO> routeQueues = new ArrayList<PersistedMessageBO>();
383
384
385 if (StringUtils.isBlank(routeQueueForm.getFilterApplied())) {
386 routeQueues.addAll(getRouteQueueService().findAll(maxRows));
387 }
388
389
390 else {
391 if (!StringUtils.isBlank(routeQueueForm.getRouteQueueIdFilter())) {
392 if (!NumberUtils.isNumber(routeQueueForm.getRouteQueueIdFilter())) {
393
394 throw new RuntimeException("Message Id must be a number.");
395 }
396 }
397
398 Map<String, String> criteriaValues = new HashMap<String, String>();
399 String key = null;
400 String value = null;
401 String trimmedKey = null;
402 for (Iterator iter = request.getParameterMap().keySet().iterator(); iter.hasNext();) {
403 key = (String) iter.next();
404 if (key.endsWith(KSBConstants.ROUTE_QUEUE_FILTER_SUFFIX)) {
405 value = request.getParameter(key);
406 if (StringUtils.isNotBlank(value)) {
407 trimmedKey = key.substring(0, key.indexOf(KSBConstants.ROUTE_QUEUE_FILTER_SUFFIX));
408 criteriaValues.put(trimmedKey, value);
409 }
410 }
411 }
412 routeQueues.addAll(getRouteQueueService().findByValues(criteriaValues, maxRows));
413 }
414 return routeQueues;
415 }
416
417 private MessageQueueService getRouteQueueService() {
418 return KSBServiceLocator.getMessageQueueService();
419 }
420
421
422
423
424
425
426
427
428
429
430
431 protected AsynchronousCall unwrapPayload(PersistedMessageBO message) {
432 if (message == null || message.getPayload() == null) {
433 return null;
434 }
435 String encodedPayload = message.getPayload().getPayload();
436 if (StringUtils.isBlank(encodedPayload)) {
437 return null;
438 }
439 Object decodedPayload = null;
440 if (encodedPayload != null) {
441 decodedPayload = SerializationUtils.deserializeFromBase64(encodedPayload);
442 }
443
444 if ((decodedPayload != null) && !(decodedPayload instanceof AsynchronousCall)) {
445 throw new IllegalArgumentException("PersistedMessageBO payload was not of the expected class. " + "Expected was ["
446 + AsynchronousCall.class.getName() + "], actual was: [" + decodedPayload.getClass().getName() + "]");
447 }
448 return (AsynchronousCall) decodedPayload;
449 }
450
451 }