1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.ksb.messaging;
17
18 import org.apache.log4j.Logger;
19 import org.kuali.rice.core.api.config.property.ConfigContext;
20 import org.kuali.rice.ksb.messaging.service.MessageQueueService;
21 import org.kuali.rice.ksb.service.KSBServiceLocator;
22 import org.kuali.rice.ksb.util.KSBConstants;
23 import org.kuali.rice.ksb.util.KSBConstants.Config;
24 import org.springframework.transaction.TransactionStatus;
25 import org.springframework.transaction.support.TransactionCallback;
26
27
28
29
30
31
32
33 public class MessageFetcher implements Runnable {
34
35 private static final Logger LOG = Logger.getLogger(MessageFetcher.class);
36
37 private Integer maxMessages;
38 private Long routeQueueId;
39
40 public MessageFetcher(Integer maxMessages) {
41 this.maxMessages = maxMessages;
42 }
43
44 public MessageFetcher(Long routeQueueId) {
45 this.routeQueueId = routeQueueId;
46 }
47
48 public void run() {
49 if (ConfigContext.getCurrentContextConfig().getBooleanProperty(KSBConstants.Config.MESSAGE_PERSISTENCE, false)) {
50 try {
51 requeueDocument();
52 requeueMessages();
53 } catch (Throwable t) {
54 LOG.error("Failed to fetch messages.", t);
55 }
56 }
57 }
58
59 private void requeueMessages() {
60 if (this.routeQueueId == null) {
61 try {
62 for (final PersistedMessageBO message : getRouteQueueService().getNextDocuments(maxMessages)) {
63 markEnrouteAndSaveMessage(message);
64 executeMessage(message);
65 }
66 } catch (Throwable t) {
67 LOG.error("Failed to fetch or process some messages during requeueMessages", t);
68 }
69 }
70 }
71
72 private void requeueDocument() {
73 try {
74 if (this.routeQueueId != null) {
75 PersistedMessageBO message = getRouteQueueService().findByRouteQueueId(this.routeQueueId);
76 message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING);
77 getRouteQueueService().save(message);
78 executeMessage(message);
79 }
80 } catch (Throwable t) {
81 LOG.error("Failed to fetch or process some messages during requeueDocument", t);
82 }
83 }
84
85 private void executeMessage(PersistedMessageBO message) {
86 try {
87 KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
88 } catch (Throwable t) {
89 LOG.error("Failed to place message " + message + " in thread pool for execution", t);
90 }
91 }
92
93 private void markEnrouteAndSaveMessage(final PersistedMessageBO message) {
94 try {
95 KSBServiceLocator.getTransactionTemplate().execute(new TransactionCallback() {
96 public Object doInTransaction(TransactionStatus status) {
97 message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING);
98 getRouteQueueService().save(message);
99 return null;
100 }
101 });
102 } catch (Throwable t) {
103 LOG.error("Caught error attempting to mark message " + message + " as R", t);
104 }
105 }
106
107 private MessageQueueService getRouteQueueService() {
108 return KSBServiceLocator.getMessageQueueService();
109 }
110
111 }