1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.ksb.messaging;
17
18 import org.apache.log4j.Logger;
19 import org.kuali.rice.ksb.messaging.service.MessageQueueService;
20 import org.kuali.rice.ksb.service.KSBServiceLocator;
21 import org.kuali.rice.ksb.util.KSBConstants;
22 import org.springframework.transaction.TransactionStatus;
23 import org.springframework.transaction.support.TransactionCallback;
24
25
26
27
28
29
30
31 public class MessageFetcher implements Runnable {
32
33 private static final Logger LOG = Logger.getLogger(MessageFetcher.class);
34
35 private Integer maxMessages;
36 private Long routeQueueId;
37
38 public MessageFetcher(Integer maxMessages) {
39 this.maxMessages = maxMessages;
40 }
41
42 public MessageFetcher(Long routeQueueId) {
43 this.routeQueueId = routeQueueId;
44 }
45
46 public void run() {
47 try {
48 requeueDocument();
49 requeueMessages();
50 } catch (Throwable t) {
51 LOG.error("Failed to fetch messages.", t);
52 }
53 }
54
55 private void requeueMessages() {
56 if (this.routeQueueId == null) {
57 try {
58 for (final PersistedMessage message : getRouteQueueService().getNextDocuments(maxMessages)) {
59 markEnrouteAndSaveMessage(message);
60 executeMessage(message);
61 }
62 } catch (Throwable t) {
63 LOG.error("Failed to fetch or process some messages during requeueMessages", t);
64 }
65 }
66 }
67
68 private void requeueDocument() {
69 try {
70 if (this.routeQueueId != null) {
71 PersistedMessage message = getRouteQueueService().findByRouteQueueId(this.routeQueueId);
72 message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING);
73 getRouteQueueService().save(message);
74 executeMessage(message);
75 }
76 } catch (Throwable t) {
77 LOG.error("Failed to fetch or process some messages during requeueDocument", t);
78 }
79 }
80
81 private void executeMessage(PersistedMessage message) {
82 try {
83 KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
84 } catch (Throwable t) {
85 LOG.error("Failed to place message " + message + " in thread pool for execution", t);
86 }
87 }
88
89 private void markEnrouteAndSaveMessage(final PersistedMessage message) {
90 try {
91 KSBServiceLocator.getTransactionTemplate().execute(new TransactionCallback() {
92 public Object doInTransaction(TransactionStatus status) {
93 message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING);
94 getRouteQueueService().save(message);
95 return null;
96 }
97 });
98 } catch (Throwable t) {
99 LOG.error("Caught error attempting to mark message " + message + " as R", t);
100 }
101 }
102
103 private MessageQueueService getRouteQueueService() {
104 return KSBServiceLocator.getRouteQueueService();
105 }
106
107 }