1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.ksb.messaging;
17
18 import org.apache.log4j.Logger;
19 import org.kuali.rice.core.api.config.property.ConfigContext;
20 import org.kuali.rice.ksb.messaging.service.MessageQueueService;
21 import org.kuali.rice.ksb.service.KSBServiceLocator;
22 import org.kuali.rice.ksb.util.KSBConstants;
23 import org.springframework.transaction.TransactionStatus;
24 import org.springframework.transaction.support.TransactionCallback;
25
26
27
28
29
30
31 public class MessageFetcher implements Runnable {
32
33 private static final Logger LOG = Logger.getLogger(MessageFetcher.class);
34
35 private Integer maxMessages;
36 private Long routeQueueId;
37
38 public MessageFetcher(Integer maxMessages) {
39 this.maxMessages = maxMessages;
40 }
41
42 public MessageFetcher(Long routeQueueId) {
43 this.routeQueueId = routeQueueId;
44 }
45
46 public void run() {
47 if (ConfigContext.getCurrentContextConfig().getBooleanProperty(KSBConstants.Config.MESSAGE_PERSISTENCE,
48 false)) {
49 try {
50 requeueDocument();
51 requeueMessages();
52 } catch (Throwable t) {
53 LOG.error("Failed to fetch messages.", t);
54 }
55 }
56 }
57
58 private void requeueMessages() {
59 if (this.routeQueueId == null) {
60 try {
61 for (PersistedMessageBO message : getRouteQueueService().getNextDocuments(maxMessages)) {
62 message = markEnrouteAndSaveMessage(message);
63 executeMessage(message);
64 }
65 } catch (Throwable t) {
66 LOG.error("Failed to fetch or process some messages during requeueMessages", t);
67 }
68 }
69 }
70
71 private void requeueDocument() {
72 try {
73 if (this.routeQueueId != null) {
74 PersistedMessageBO message = getRouteQueueService().findByRouteQueueId(this.routeQueueId);
75 message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING);
76 message = getRouteQueueService().save(message);
77 executeMessage(message);
78 }
79 } catch (Throwable t) {
80 LOG.error("Failed to fetch or process some messages during requeueDocument", t);
81 }
82 }
83
84 private void executeMessage(PersistedMessageBO message) {
85 try {
86 KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
87 } catch (Throwable t) {
88 LOG.error("Failed to place message " + message + " in thread pool for execution", t);
89 }
90 }
91
92 private PersistedMessageBO markEnrouteAndSaveMessage(final PersistedMessageBO message) {
93 try {
94 return KSBServiceLocator.getTransactionTemplate().execute(new TransactionCallback<PersistedMessageBO>() {
95 public PersistedMessageBO doInTransaction(TransactionStatus status) {
96 message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING);
97 return getRouteQueueService().save(message);
98 }
99 });
100 } catch (Throwable t) {
101 LOG.error("Caught error attempting to mark message " + message + " as R", t);
102 }
103 return message;
104 }
105
106 private MessageQueueService getRouteQueueService() {
107 return KSBServiceLocator.getMessageQueueService();
108 }
109
110 }