1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  package org.kuali.rice.ksb.messaging;
17  
18  import org.apache.log4j.Logger;
19  import org.kuali.rice.core.api.config.property.ConfigContext;
20  import org.kuali.rice.ksb.messaging.service.MessageQueueService;
21  import org.kuali.rice.ksb.service.KSBServiceLocator;
22  import org.kuali.rice.ksb.util.KSBConstants;
23  import org.springframework.transaction.TransactionStatus;
24  import org.springframework.transaction.support.TransactionCallback;
25  
26  
27  
28  
29  
30  
31  
32  public class MessageFetcher implements Runnable {
33  
34      private static final Logger LOG = Logger.getLogger(MessageFetcher.class);
35  
36      private Integer maxMessages;
37      private Long routeQueueId;
38  
39      public MessageFetcher(Integer maxMessages) {
40  	this.maxMessages = maxMessages;
41      }
42  
43      public MessageFetcher(Long routeQueueId) {
44  	this.routeQueueId = routeQueueId;
45      }
46  
47      public void run() {
48      	if (ConfigContext.getCurrentContextConfig().getBooleanProperty(KSBConstants.Config.MESSAGE_PERSISTENCE, false)) {
49      		try {
50      			requeueDocument();
51      			requeueMessages();
52      		} catch (Throwable t) {
53      			LOG.error("Failed to fetch messages.", t);
54      		}
55      	}
56      }
57  
58      private void requeueMessages() {
59  	if (this.routeQueueId == null) {
60  	    try {
61  		for (final PersistedMessageBO message : getRouteQueueService().getNextDocuments(maxMessages)) {
62  		    markEnrouteAndSaveMessage(message);
63  		    executeMessage(message);
64  		}
65  	    } catch (Throwable t) {
66  		LOG.error("Failed to fetch or process some messages during requeueMessages", t);
67  	    }
68  	}
69      }
70  
71      private void requeueDocument() {
72  	try {
73  	    if (this.routeQueueId != null) {
74  		PersistedMessageBO message = getRouteQueueService().findByRouteQueueId(this.routeQueueId);
75  		message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING);
76  		getRouteQueueService().save(message);
77  		executeMessage(message);
78  	    }
79  	} catch (Throwable t) {
80  	    LOG.error("Failed to fetch or process some messages during requeueDocument", t);
81  	}
82      }
83  
84      private void executeMessage(PersistedMessageBO message) {
85  	try {
86  	    KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
87  	} catch (Throwable t) {
88  	    LOG.error("Failed to place message " + message + " in thread pool for execution", t);
89  	}
90      }
91  
92      private void markEnrouteAndSaveMessage(final PersistedMessageBO message) {
93  	try {
94  	    KSBServiceLocator.getTransactionTemplate().execute(new TransactionCallback() {
95  		public Object doInTransaction(TransactionStatus status) {
96  		    message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING);
97  		    getRouteQueueService().save(message);
98  		    return null;
99  		}
100 	    });
101 	} catch (Throwable t) {
102 	    LOG.error("Caught error attempting to mark message " + message + " as R", t);
103 	}
104     }
105 
106     private MessageQueueService getRouteQueueService() {
107 	return KSBServiceLocator.getMessageQueueService();
108     }
109 
110 }