1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.ksb.messaging.web;
17
18 import org.apache.commons.collections.comparators.ComparableComparator;
19 import org.apache.commons.lang.StringUtils;
20 import org.apache.commons.lang.math.NumberUtils;
21 import org.apache.struts.action.ActionForm;
22 import org.apache.struts.action.ActionForward;
23 import org.apache.struts.action.ActionMapping;
24 import org.apache.struts.action.ActionMessage;
25 import org.apache.struts.action.ActionMessages;
26 import org.kuali.rice.core.api.config.CoreConfigHelper;
27 import org.kuali.rice.core.api.config.property.ConfigContext;
28 import org.kuali.rice.core.api.util.ConcreteKeyValue;
29 import org.kuali.rice.core.api.util.RiceConstants;
30 import org.kuali.rice.core.api.util.RiceUtilities;
31 import org.kuali.rice.core.api.util.io.SerializationUtils;
32 import org.kuali.rice.ksb.api.KsbApiServiceLocator;
33 import org.kuali.rice.ksb.api.messaging.AsynchronousCall;
34 import org.kuali.rice.ksb.api.registry.ServiceInfo;
35 import org.kuali.rice.ksb.messaging.MessageFetcher;
36 import org.kuali.rice.ksb.messaging.MessageServiceInvoker;
37 import org.kuali.rice.ksb.messaging.PersistedMessageBO;
38 import org.kuali.rice.ksb.messaging.service.MessageQueueService;
39 import org.kuali.rice.ksb.service.KSBServiceLocator;
40 import org.kuali.rice.ksb.util.KSBConstants;
41
42 import javax.servlet.ServletException;
43 import javax.servlet.http.HttpServletRequest;
44 import javax.servlet.http.HttpServletResponse;
45 import java.io.IOException;
46 import java.sql.Timestamp;
47 import java.util.ArrayList;
48 import java.util.Calendar;
49 import java.util.Collections;
50 import java.util.Comparator;
51 import java.util.Date;
52 import java.util.HashMap;
53 import java.util.Iterator;
54 import java.util.List;
55 import java.util.Map;
56
57
58
59
60
61
62
63 public class MessageQueueAction extends KSBAction {
64
65 @Override
66 public ActionForward start(ActionMapping mapping, ActionForm form, HttpServletRequest request,
67 HttpServletResponse response) throws IOException, ServletException {
68 return mapping.findForward("report");
69 }
70
71 public ActionForward save(ActionMapping mapping, ActionForm form, HttpServletRequest request,
72 HttpServletResponse response) throws Exception {
73 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
74 save(routeQueueForm);
75
76 routeQueueForm.getMessageQueueFromForm().getRouteQueueId();
77 ActionMessages messages = new ActionMessages();
78 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.saved"));
79 saveMessages(request, messages);
80
81
82
83
84
85
86
87
88
89
90
91 return mapping.findForward("report");
92 }
93
94 public ActionForward saveAndResubmit(ActionMapping mapping, ActionForm form, HttpServletRequest request,
95 HttpServletResponse response) throws Exception {
96 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
97 PersistedMessageBO message = save(routeQueueForm);
98 KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
99
100 ActionMessages messages = new ActionMessages();
101 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.queued"));
102 saveMessages(request, messages);
103
104 routeQueueForm.setMessageId(null);
105 routeQueueForm.setMessageQueueFromDatabase(null);
106 routeQueueForm.setMessageQueueFromForm(null);
107 routeQueueForm.setShowEdit("yes");
108 routeQueueForm.setMethodToCall("");
109 establishRequiredState(request, form);
110 routeQueueForm.setMessageId(message.getRouteQueueId());
111 routeQueueForm.setMessageQueueFromForm(message);
112 routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
113 routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(message));
114 return mapping.findForward("report");
115 }
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144 private PersistedMessageBO save(MessageQueueForm routeQueueForm) {
145 Long routeQueueId = routeQueueForm.getMessageQueueFromForm().getRouteQueueId();
146 if ((routeQueueId == null) || (routeQueueId.longValue() <= 0)) {
147 throw new IllegalArgumentException("Invalid routeQueueId passed in. Cannot save");
148 }
149
150 PersistedMessageBO existingMessage = KSBServiceLocator.getMessageQueueService().findByRouteQueueId(routeQueueId);
151 PersistedMessageBO message = routeQueueForm.getMessageQueueFromForm();
152
153 if (existingMessage == null) {
154 throw new RuntimeException("Could locate the existing message, it may have already been processed.");
155 }
156
157 existingMessage.setQueuePriority(message.getQueuePriority());
158 existingMessage.setIpNumber(message.getIpNumber());
159 existingMessage.setLockVerNbr(message.getLockVerNbr());
160 existingMessage.setApplicationId(message.getApplicationId());
161 existingMessage.setMethodName(message.getMethodName());
162 existingMessage.setQueueStatus(message.getQueueStatus());
163 existingMessage.setRetryCount(message.getRetryCount());
164 existingMessage.setServiceName(message.getServiceName());
165 existingMessage.setValue1(message.getValue1());
166 existingMessage.setValue2(message.getValue2());
167 KSBServiceLocator.getMessageQueueService().save(existingMessage);
168 return existingMessage;
169 }
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201 protected void quickRequeueMessage(PersistedMessageBO message) {
202 message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING);
203 message.setQueueDate(new Timestamp(Calendar.getInstance().getTimeInMillis()));
204 message.setRetryCount(new Integer(0));
205 getRouteQueueService().save(message);
206 }
207
208 public ActionForward quickRequeueMessage(ActionMapping mapping, ActionForm form, HttpServletRequest request,
209 HttpServletResponse response) throws Exception {
210 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
211 if (routeQueueForm.getMessageQueueFromDatabase() == null) {
212 throw new IllegalArgumentException("No messageId passed in with the Request.");
213 }
214
215 PersistedMessageBO message = routeQueueForm.getMessageQueueFromDatabase();
216 quickRequeueMessage(message);
217 KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
218
219 ActionMessages messages = new ActionMessages();
220 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.requeued"));
221 saveMessages(request, messages);
222
223 routeQueueForm.setMessageQueueFromDatabase(null);
224 routeQueueForm.setMessageQueueFromForm(null);
225 routeQueueForm.setMessageId(null);
226 routeQueueForm.setMethodToCall("");
227
228
229 establishRequiredState(request, form);
230 return mapping.findForward("report");
231 }
232
233 public ActionForward edit(ActionMapping mapping, ActionForm form, HttpServletRequest request,
234 HttpServletResponse response) throws IOException, ServletException {
235 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
236 routeQueueForm.setShowEdit("yes");
237 routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
238 routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
239 routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(routeQueueForm.getMessageQueueFromForm()));
240 return mapping.findForward("basic");
241 }
242
243 public ActionForward view(ActionMapping mapping, ActionForm form, HttpServletRequest request,
244 HttpServletResponse response) throws Exception {
245 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
246 routeQueueForm.setShowEdit("no");
247 routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
248 routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
249 AsynchronousCall messagePayload = unwrapPayload(routeQueueForm.getMessageQueueFromDatabase());
250 routeQueueForm.getMessageQueueFromForm().setMethodCall(messagePayload);
251 return mapping.findForward("payload");
252 }
253
254 public ActionForward reset(ActionMapping mapping, ActionForm form, HttpServletRequest request,
255 HttpServletResponse response) throws Exception {
256 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
257 if (routeQueueForm.getShowEdit().equals("yes")) {
258 routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
259 }
260 return mapping.findForward("basic");
261 }
262
263 public ActionForward clear(ActionMapping mapping, ActionForm form, HttpServletRequest request,
264 HttpServletResponse response) throws Exception {
265 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
266 routeQueueForm.getMessageQueueFromForm().setQueuePriority(null);
267 routeQueueForm.getMessageQueueFromForm().setQueueStatus(null);
268 routeQueueForm.getMessageQueueFromForm().setQueueDate(null);
269 routeQueueForm.getMessageQueueFromForm().setExpirationDate(null);
270 routeQueueForm.getMessageQueueFromForm().setRetryCount(null);
271 routeQueueForm.getMessageQueueFromForm().setIpNumber(null);
272 routeQueueForm.getMessageQueueFromForm().setServiceName(null);
273 routeQueueForm.getMessageQueueFromForm().setApplicationId(null);
274 routeQueueForm.getMessageQueueFromForm().setMethodName(null);
275 routeQueueForm.getMessageQueueFromForm().setPayload(null);
276 routeQueueForm.getMessageQueueFromForm().setMethodCall(null);
277 routeQueueForm.setExistingQueueDate(null);
278 routeQueueForm.setNewQueueDate(null);
279 return mapping.findForward("basic");
280 }
281
282 public ActionForward delete(ActionMapping mapping, ActionForm form, HttpServletRequest request,
283 HttpServletResponse response) throws Exception {
284 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
285 routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
286 routeQueueForm.setMessageQueueFromDatabase(null);
287 getRouteQueueService().delete(routeQueueForm.getMessageQueueFromForm());
288 ActionMessages messages = new ActionMessages();
289 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.deleted", routeQueueForm
290 .getMessageQueueFromForm().getRouteQueueId().toString()));
291 saveMessages(request, messages);
292 routeQueueForm.setMessageId(null);
293 establishRequiredState(request, form);
294 return mapping.findForward("report");
295 }
296
297 public ActionForward executeMessageFetcher(ActionMapping mapping, ActionForm form, HttpServletRequest request,
298 HttpServletResponse response) throws Exception {
299 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
300 ActionMessages messages = new ActionMessages();
301 if (routeQueueForm.getMaxMessageFetcherMessages() == null || routeQueueForm.getMaxMessageFetcherMessages() <= 0) {
302 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.invalidMessages", routeQueueForm.getMaxMessageFetcherMessages()));
303 }
304 if (!messages.isEmpty()) {
305 saveMessages(request, messages);
306 return mapping.findForward("report");
307 }
308 new MessageFetcher(routeQueueForm.getMaxMessageFetcherMessages()).run();
309 return mapping.findForward("report");
310 }
311
312
313
314
315
316
317
318 @Override
319 public ActionMessages establishRequiredState(HttpServletRequest request, ActionForm form) throws Exception {
320 request.setAttribute("rice_constant", getServlet().getServletContext().getAttribute("RiceConstants"));
321 request.setAttribute("ksb_constant", getServlet().getServletContext().getAttribute("KSBConstants"));
322 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
323 routeQueueForm.setMyIpAddress(RiceUtilities.getIpNumber());
324 routeQueueForm.setMyApplicationId(CoreConfigHelper.getApplicationId());
325 routeQueueForm.setMessagePersistence(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_PERSISTENCE));
326 routeQueueForm.setMessageDelivery(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY));
327 routeQueueForm.setMessageOff(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGING_OFF));
328 List<ServiceInfo> services = KsbApiServiceLocator.getServiceRegistry().getAllOnlineServices();
329 if (routeQueueForm.getMessageId() != null) {
330 PersistedMessageBO rq = getRouteQueueService().findByRouteQueueId(routeQueueForm.getMessageId());
331 if (rq != null) {
332 routeQueueForm.setExistingQueueDate(RiceConstants.getDefaultDateFormat().format(new Date()));
333 routeQueueForm.setMessageQueueFromDatabase(rq);
334
335 String serviceName = rq.getServiceName();
336 for (ServiceInfo serviceInfo : services) {
337 if (serviceInfo.getServiceName().equals(serviceName)) {
338 routeQueueForm.getIpAddresses().add(
339 new ConcreteKeyValue(serviceInfo.getServerIpAddress(), serviceInfo.getServerIpAddress()));
340 }
341 }
342 } else {
343 ActionMessages messages = new ActionMessages();
344 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage(
345 "messagequeue.RouteQueueService.queuedDocumentNotFound", routeQueueForm.getMessageId().toString()));
346 return messages;
347 }
348 routeQueueForm.setMessageId(null);
349 } else if (!"clear".equalsIgnoreCase(request.getParameter("methodToCall"))) {
350 List<PersistedMessageBO> queueEntries = findRouteQueues(request, routeQueueForm, routeQueueForm.getMaxRows() + 1);
351 if (queueEntries.size() > 0) {
352 Collections.sort(queueEntries, new Comparator() {
353 private Comparator comp = new ComparableComparator();
354
355 @Override
356 public int compare(Object object1, Object object2) {
357 if (object1 == null && object2 == null) {
358 return 0;
359 } else if (object1 == null) {
360 return 1;
361 } else if (object2 == null) {
362 return -1;
363 }
364 Long id1 = ((PersistedMessageBO) object1).getRouteQueueId();
365 Long id2 = ((PersistedMessageBO) object2).getRouteQueueId();
366
367 try {
368 return this.comp.compare(id1, id2);
369 } catch (Exception e) {
370 return 0;
371 }
372 }
373 });
374 }
375 routeQueueForm.setMessageQueueRows(queueEntries);
376 }
377 return null;
378 }
379
380 protected List<PersistedMessageBO> findRouteQueues(HttpServletRequest request, MessageQueueForm routeQueueForm, int maxRows) {
381 List<PersistedMessageBO> routeQueues = new ArrayList<PersistedMessageBO>();
382
383
384 if (StringUtils.isBlank(routeQueueForm.getFilterApplied())) {
385 routeQueues.addAll(getRouteQueueService().findAll(maxRows));
386 }
387
388
389 else {
390 if (!StringUtils.isBlank(routeQueueForm.getRouteQueueIdFilter())) {
391 if (!NumberUtils.isNumber(routeQueueForm.getRouteQueueIdFilter())) {
392
393 throw new RuntimeException("Message Id must be a number.");
394 }
395 }
396
397 Map<String, String> criteriaValues = new HashMap<String, String>();
398 String key = null;
399 String value = null;
400 String trimmedKey = null;
401 for (Iterator iter = request.getParameterMap().keySet().iterator(); iter.hasNext();) {
402 key = (String) iter.next();
403 if (key.endsWith(KSBConstants.ROUTE_QUEUE_FILTER_SUFFIX)) {
404 value = request.getParameter(key);
405 if (StringUtils.isNotBlank(value)) {
406 trimmedKey = key.substring(0, key.indexOf(KSBConstants.ROUTE_QUEUE_FILTER_SUFFIX));
407 criteriaValues.put(trimmedKey, value);
408 }
409 }
410 }
411 routeQueues.addAll(getRouteQueueService().findByValues(criteriaValues, maxRows));
412 }
413 return routeQueues;
414 }
415
416 private MessageQueueService getRouteQueueService() {
417 return KSBServiceLocator.getMessageQueueService();
418 }
419
420
421
422
423
424
425
426
427
428
429
430 protected AsynchronousCall unwrapPayload(PersistedMessageBO message) {
431 if (message == null || message.getPayload() == null) {
432 return null;
433 }
434 String encodedPayload = message.getPayload().getPayload();
435 if (StringUtils.isBlank(encodedPayload)) {
436 return null;
437 }
438 Object decodedPayload = null;
439 if (encodedPayload != null) {
440 decodedPayload = SerializationUtils.deserializeFromBase64(encodedPayload);
441 }
442
443 if ((decodedPayload != null) && !(decodedPayload instanceof AsynchronousCall)) {
444 throw new IllegalArgumentException("PersistedMessageBO payload was not of the expected class. " + "Expected was ["
445 + AsynchronousCall.class.getName() + "], actual was: [" + decodedPayload.getClass().getName() + "]");
446 }
447 return (AsynchronousCall) decodedPayload;
448 }
449
450 }