1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.ksb.messaging;
17
18 import org.apache.log4j.Logger;
19 import org.kuali.rice.core.api.config.property.ConfigContext;
20 import org.kuali.rice.ksb.messaging.service.MessageQueueService;
21 import org.kuali.rice.ksb.service.KSBServiceLocator;
22 import org.kuali.rice.ksb.util.KSBConstants;
23 import org.springframework.transaction.TransactionStatus;
24 import org.springframework.transaction.support.TransactionCallback;
25
26
27
28
29
30
31
32 public class MessageFetcher implements Runnable {
33
34 private static final Logger LOG = Logger.getLogger(MessageFetcher.class);
35
36 private Integer maxMessages;
37 private Long routeQueueId;
38
39 public MessageFetcher(Integer maxMessages) {
40 this.maxMessages = maxMessages;
41 }
42
43 public MessageFetcher(Long routeQueueId) {
44 this.routeQueueId = routeQueueId;
45 }
46
47 public void run() {
48 if (ConfigContext.getCurrentContextConfig().getBooleanProperty(KSBConstants.Config.MESSAGE_PERSISTENCE, false)) {
49 try {
50 requeueDocument();
51 requeueMessages();
52 } catch (Throwable t) {
53 LOG.error("Failed to fetch messages.", t);
54 }
55 }
56 }
57
58 private void requeueMessages() {
59 if (this.routeQueueId == null) {
60 try {
61 for (final PersistedMessageBO message : getRouteQueueService().getNextDocuments(maxMessages)) {
62 markEnrouteAndSaveMessage(message);
63 executeMessage(message);
64 }
65 } catch (Throwable t) {
66 LOG.error("Failed to fetch or process some messages during requeueMessages", t);
67 }
68 }
69 }
70
71 private void requeueDocument() {
72 try {
73 if (this.routeQueueId != null) {
74 PersistedMessageBO message = getRouteQueueService().findByRouteQueueId(this.routeQueueId);
75 message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING);
76 getRouteQueueService().save(message);
77 executeMessage(message);
78 }
79 } catch (Throwable t) {
80 LOG.error("Failed to fetch or process some messages during requeueDocument", t);
81 }
82 }
83
84 private void executeMessage(PersistedMessageBO message) {
85 try {
86 KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
87 } catch (Throwable t) {
88 LOG.error("Failed to place message " + message + " in thread pool for execution", t);
89 }
90 }
91
92 private void markEnrouteAndSaveMessage(final PersistedMessageBO message) {
93 try {
94 KSBServiceLocator.getTransactionTemplate().execute(new TransactionCallback() {
95 public Object doInTransaction(TransactionStatus status) {
96 message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING);
97 getRouteQueueService().save(message);
98 return null;
99 }
100 });
101 } catch (Throwable t) {
102 LOG.error("Caught error attempting to mark message " + message + " as R", t);
103 }
104 }
105
106 private MessageQueueService getRouteQueueService() {
107 return KSBServiceLocator.getMessageQueueService();
108 }
109
110 }