1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.ksb.messaging.web;
17
18 import org.apache.commons.collections.comparators.ComparableComparator;
19 import org.apache.commons.lang.StringUtils;
20 import org.apache.commons.lang.math.NumberUtils;
21 import org.apache.struts.action.ActionForm;
22 import org.apache.struts.action.ActionForward;
23 import org.apache.struts.action.ActionMapping;
24 import org.apache.struts.action.ActionMessage;
25 import org.apache.struts.action.ActionMessages;
26 import org.kuali.rice.core.api.config.CoreConfigHelper;
27 import org.kuali.rice.core.api.config.property.ConfigContext;
28 import org.kuali.rice.core.api.util.ConcreteKeyValue;
29 import org.kuali.rice.core.api.util.RiceConstants;
30 import org.kuali.rice.core.api.util.RiceUtilities;
31 import org.kuali.rice.core.api.util.io.SerializationUtils;
32 import org.kuali.rice.ksb.api.KsbApiServiceLocator;
33 import org.kuali.rice.ksb.api.messaging.AsynchronousCall;
34 import org.kuali.rice.ksb.api.registry.ServiceInfo;
35 import org.kuali.rice.ksb.messaging.MessageFetcher;
36 import org.kuali.rice.ksb.messaging.MessageServiceInvoker;
37 import org.kuali.rice.ksb.messaging.PersistedMessageBO;
38 import org.kuali.rice.ksb.messaging.service.MessageQueueService;
39 import org.kuali.rice.ksb.service.KSBServiceLocator;
40 import org.kuali.rice.ksb.util.KSBConstants;
41
42 import javax.servlet.ServletException;
43 import javax.servlet.http.HttpServletRequest;
44 import javax.servlet.http.HttpServletResponse;
45 import java.io.IOException;
46 import java.sql.Timestamp;
47 import java.util.ArrayList;
48 import java.util.Calendar;
49 import java.util.Collections;
50 import java.util.Comparator;
51 import java.util.Date;
52 import java.util.HashMap;
53 import java.util.Iterator;
54 import java.util.List;
55 import java.util.Map;
56
57
58
59
60
61
62
63 public class MessageQueueAction extends KSBAction {
64
65 @Override
66 public ActionForward start(ActionMapping mapping, ActionForm form, HttpServletRequest request,
67 HttpServletResponse response) throws IOException, ServletException {
68 return mapping.findForward("report");
69 }
70
71 public ActionForward save(ActionMapping mapping, ActionForm form, HttpServletRequest request,
72 HttpServletResponse response) throws Exception {
73 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
74 save(routeQueueForm);
75
76 routeQueueForm.getMessageQueueFromForm().getRouteQueueId();
77 ActionMessages messages = new ActionMessages();
78 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.saved"));
79 saveMessages(request, messages);
80
81
82
83
84
85
86
87
88
89
90
91 return mapping.findForward("report");
92 }
93
94 public ActionForward saveAndResubmit(ActionMapping mapping, ActionForm form, HttpServletRequest request,
95 HttpServletResponse response) throws Exception {
96 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
97 PersistedMessageBO message = save(routeQueueForm);
98 KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
99
100 ActionMessages messages = new ActionMessages();
101 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.queued"));
102 saveMessages(request, messages);
103
104 routeQueueForm.setMessageId(null);
105 routeQueueForm.setMessageQueueFromDatabase(null);
106 routeQueueForm.setMessageQueueFromForm(null);
107 routeQueueForm.setShowEdit("yes");
108 routeQueueForm.setMethodToCall("");
109 establishRequiredState(request, form);
110 routeQueueForm.setMessageId(message.getRouteQueueId());
111 routeQueueForm.setMessageQueueFromForm(message);
112 routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
113 routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(message));
114 return mapping.findForward("report");
115 }
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144 private PersistedMessageBO save(MessageQueueForm routeQueueForm) {
145 Long routeQueueId = routeQueueForm.getMessageQueueFromForm().getRouteQueueId();
146 if ((routeQueueId == null) || (routeQueueId.longValue() <= 0)) {
147 throw new IllegalArgumentException("Invalid routeQueueId passed in. Cannot save");
148 }
149
150 PersistedMessageBO existingMessage = KSBServiceLocator.getMessageQueueService().findByRouteQueueId(routeQueueId);
151 PersistedMessageBO message = routeQueueForm.getMessageQueueFromForm();
152
153 if (existingMessage == null) {
154 throw new RuntimeException("Could locate the existing message, it may have already been processed.");
155 }
156
157 existingMessage.setQueuePriority(message.getQueuePriority());
158 existingMessage.setIpNumber(message.getIpNumber());
159 existingMessage.setLockVerNbr(message.getLockVerNbr());
160 existingMessage.setApplicationId(message.getApplicationId());
161 existingMessage.setMethodName(message.getMethodName());
162 existingMessage.setQueueStatus(message.getQueueStatus());
163 existingMessage.setRetryCount(message.getRetryCount());
164 existingMessage.setServiceName(message.getServiceName());
165 existingMessage.setValue1(message.getValue1());
166 existingMessage.setValue2(message.getValue2());
167 existingMessage = KSBServiceLocator.getMessageQueueService().save(existingMessage);
168 return existingMessage;
169 }
170
171
172
173
174
175
176
177
178
179
180 protected PersistedMessageBO quickRequeueMessage(PersistedMessageBO message) {
181 message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING);
182 message.setQueueDate(new Timestamp(Calendar.getInstance().getTimeInMillis()));
183 message.setRetryCount(new Integer(0));
184 return getRouteQueueService().save(message);
185 }
186
187 public ActionForward quickRequeueMessage(ActionMapping mapping, ActionForm form, HttpServletRequest request,
188 HttpServletResponse response) throws Exception {
189 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
190 if (routeQueueForm.getMessageQueueFromDatabase() == null) {
191 throw new IllegalArgumentException("No messageId passed in with the Request.");
192 }
193
194 PersistedMessageBO message = routeQueueForm.getMessageQueueFromDatabase();
195 message = quickRequeueMessage(message);
196 KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
197
198 ActionMessages messages = new ActionMessages();
199 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.requeued"));
200 saveMessages(request, messages);
201
202 routeQueueForm.setMessageQueueFromDatabase(null);
203 routeQueueForm.setMessageQueueFromForm(null);
204 routeQueueForm.setMessageId(null);
205 routeQueueForm.setMethodToCall("");
206
207
208 establishRequiredState(request, form);
209 return mapping.findForward("report");
210 }
211
212 public ActionForward edit(ActionMapping mapping, ActionForm form, HttpServletRequest request,
213 HttpServletResponse response) throws IOException, ServletException {
214 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
215 routeQueueForm.setShowEdit("yes");
216 routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
217 routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
218 routeQueueForm.getMessageQueueFromForm().setMethodCall(unwrapPayload(routeQueueForm.getMessageQueueFromForm()));
219 return mapping.findForward("basic");
220 }
221
222 public ActionForward view(ActionMapping mapping, ActionForm form, HttpServletRequest request,
223 HttpServletResponse response) throws Exception {
224 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
225 routeQueueForm.setShowEdit("no");
226 routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
227 routeQueueForm.setNewQueueDate(routeQueueForm.getExistingQueueDate());
228 AsynchronousCall messagePayload = unwrapPayload(routeQueueForm.getMessageQueueFromDatabase());
229 routeQueueForm.getMessageQueueFromForm().setMethodCall(messagePayload);
230 return mapping.findForward("payload");
231 }
232
233 public ActionForward reset(ActionMapping mapping, ActionForm form, HttpServletRequest request,
234 HttpServletResponse response) throws Exception {
235 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
236 if (routeQueueForm.getShowEdit().equals("yes")) {
237 routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
238 }
239 return mapping.findForward("basic");
240 }
241
242 public ActionForward clear(ActionMapping mapping, ActionForm form, HttpServletRequest request,
243 HttpServletResponse response) throws Exception {
244 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
245 routeQueueForm.getMessageQueueFromForm().setQueuePriority(null);
246 routeQueueForm.getMessageQueueFromForm().setQueueStatus(null);
247 routeQueueForm.getMessageQueueFromForm().setQueueDate(null);
248 routeQueueForm.getMessageQueueFromForm().setExpirationDate(null);
249 routeQueueForm.getMessageQueueFromForm().setRetryCount(null);
250 routeQueueForm.getMessageQueueFromForm().setIpNumber(null);
251 routeQueueForm.getMessageQueueFromForm().setServiceName(null);
252 routeQueueForm.getMessageQueueFromForm().setApplicationId(null);
253 routeQueueForm.getMessageQueueFromForm().setMethodName(null);
254 routeQueueForm.getMessageQueueFromForm().setPayload(null);
255 routeQueueForm.getMessageQueueFromForm().setMethodCall(null);
256 routeQueueForm.setExistingQueueDate(null);
257 routeQueueForm.setNewQueueDate(null);
258 return mapping.findForward("basic");
259 }
260
261 public ActionForward delete(ActionMapping mapping, ActionForm form, HttpServletRequest request,
262 HttpServletResponse response) throws Exception {
263 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
264 routeQueueForm.setMessageQueueFromForm(routeQueueForm.getMessageQueueFromDatabase());
265 routeQueueForm.setMessageQueueFromDatabase(null);
266 getRouteQueueService().delete(routeQueueForm.getMessageQueueFromForm());
267 ActionMessages messages = new ActionMessages();
268 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.deleted", routeQueueForm
269 .getMessageQueueFromForm().getRouteQueueId().toString()));
270 saveMessages(request, messages);
271 routeQueueForm.setMessageId(null);
272 establishRequiredState(request, form);
273 return mapping.findForward("report");
274 }
275
276 public ActionForward executeMessageFetcher(ActionMapping mapping, ActionForm form, HttpServletRequest request,
277 HttpServletResponse response) throws Exception {
278 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
279 ActionMessages messages = new ActionMessages();
280 if (routeQueueForm.getMaxMessageFetcherMessages() == null || routeQueueForm.getMaxMessageFetcherMessages() <= 0) {
281 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage("routequeue.RouteQueueService.invalidMessages", routeQueueForm.getMaxMessageFetcherMessages()));
282 }
283 if (!messages.isEmpty()) {
284 saveMessages(request, messages);
285 return mapping.findForward("report");
286 }
287 new MessageFetcher(routeQueueForm.getMaxMessageFetcherMessages()).run();
288 return mapping.findForward("report");
289 }
290
291
292
293
294
295
296
297 @Override
298 public ActionMessages establishRequiredState(HttpServletRequest request, ActionForm form) throws Exception {
299 request.setAttribute("rice_constant", getServlet().getServletContext().getAttribute("RiceConstants"));
300 request.setAttribute("ksb_constant", getServlet().getServletContext().getAttribute("KSBConstants"));
301 MessageQueueForm routeQueueForm = (MessageQueueForm) form;
302 routeQueueForm.setMyIpAddress(RiceUtilities.getIpNumber());
303 routeQueueForm.setMyApplicationId(CoreConfigHelper.getApplicationId());
304 routeQueueForm.setMessagePersistence(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_PERSISTENCE));
305 routeQueueForm.setMessageDelivery(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY));
306 routeQueueForm.setMessageOff(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGING_OFF));
307 List<ServiceInfo> services = KsbApiServiceLocator.getServiceRegistry().getAllOnlineServices();
308 if (routeQueueForm.getMessageId() != null) {
309 PersistedMessageBO rq = getRouteQueueService().findByRouteQueueId(routeQueueForm.getMessageId());
310 if (rq != null) {
311 routeQueueForm.setExistingQueueDate(RiceConstants.getDefaultDateFormat().format(new Date()));
312 routeQueueForm.setMessageQueueFromDatabase(rq);
313
314 String serviceName = rq.getServiceName();
315 for (ServiceInfo serviceInfo : services) {
316 if (serviceInfo.getServiceName().equals(serviceName)) {
317 routeQueueForm.getIpAddresses().add(
318 new ConcreteKeyValue(serviceInfo.getServerIpAddress(), serviceInfo.getServerIpAddress()));
319 }
320 }
321 } else {
322 ActionMessages messages = new ActionMessages();
323 messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage(
324 "messagequeue.RouteQueueService.queuedDocumentNotFound", routeQueueForm.getMessageId().toString()));
325 return messages;
326 }
327 routeQueueForm.setMessageId(null);
328 } else if (!"clear".equalsIgnoreCase(request.getParameter("methodToCall"))) {
329 List<PersistedMessageBO> queueEntries = findRouteQueues(request, routeQueueForm, routeQueueForm.getMaxRows() + 1);
330 if (queueEntries.size() > 0) {
331 Collections.sort(queueEntries, new Comparator() {
332 private Comparator comp = new ComparableComparator();
333
334 @Override
335 public int compare(Object object1, Object object2) {
336 if (object1 == null && object2 == null) {
337 return 0;
338 } else if (object1 == null) {
339 return 1;
340 } else if (object2 == null) {
341 return -1;
342 }
343 Long id1 = ((PersistedMessageBO) object1).getRouteQueueId();
344 Long id2 = ((PersistedMessageBO) object2).getRouteQueueId();
345
346 try {
347 return this.comp.compare(id1, id2);
348 } catch (Exception e) {
349 return 0;
350 }
351 }
352 });
353 }
354 routeQueueForm.setMessageQueueRows(queueEntries);
355 }
356 return null;
357 }
358
359 protected List<PersistedMessageBO> findRouteQueues(HttpServletRequest request, MessageQueueForm routeQueueForm, int maxRows) {
360 List<PersistedMessageBO> routeQueues = new ArrayList<PersistedMessageBO>();
361
362
363 if (StringUtils.isBlank(routeQueueForm.getFilterApplied())) {
364 routeQueues.addAll(getRouteQueueService().findAll(maxRows));
365 }
366
367
368 else {
369 if (!StringUtils.isBlank(routeQueueForm.getRouteQueueIdFilter())) {
370 if (!NumberUtils.isNumber(routeQueueForm.getRouteQueueIdFilter())) {
371
372 throw new RuntimeException("Message Id must be a number.");
373 }
374 }
375
376 Map<String, String> criteriaValues = new HashMap<String, String>();
377 String key = null;
378 String value = null;
379 String trimmedKey = null;
380 for (Iterator iter = request.getParameterMap().keySet().iterator(); iter.hasNext();) {
381 key = (String) iter.next();
382 if (key.endsWith(KSBConstants.ROUTE_QUEUE_FILTER_SUFFIX)) {
383 value = request.getParameter(key);
384 if (StringUtils.isNotBlank(value)) {
385 trimmedKey = key.substring(0, key.indexOf(KSBConstants.ROUTE_QUEUE_FILTER_SUFFIX));
386 criteriaValues.put(trimmedKey, value);
387 }
388 }
389 }
390 routeQueues.addAll(getRouteQueueService().findByValues(criteriaValues, maxRows));
391 }
392 return routeQueues;
393 }
394
395 private MessageQueueService getRouteQueueService() {
396 return KSBServiceLocator.getMessageQueueService();
397 }
398
399
400
401
402
403
404
405
406
407
408
409 protected AsynchronousCall unwrapPayload(PersistedMessageBO message) {
410 if (message == null || message.getPayload() == null) {
411 return null;
412 }
413 String encodedPayload = message.getPayload().getPayload();
414 if (StringUtils.isBlank(encodedPayload)) {
415 return null;
416 }
417 Object decodedPayload = null;
418 if (encodedPayload != null) {
419 decodedPayload = SerializationUtils.deserializeFromBase64(encodedPayload);
420 }
421
422 if ((decodedPayload != null) && !(decodedPayload instanceof AsynchronousCall)) {
423 throw new IllegalArgumentException("PersistedMessageBO payload was not of the expected class. " + "Expected was ["
424 + AsynchronousCall.class.getName() + "], actual was: [" + decodedPayload.getClass().getName() + "]");
425 }
426 return (AsynchronousCall) decodedPayload;
427 }
428
429 }