View Javadoc

1   /**
2    * Copyright 2005-2014 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.ksb.messaging;
17  
18  import org.apache.log4j.Logger;
19  import org.kuali.rice.core.api.config.property.ConfigContext;
20  import org.kuali.rice.ksb.messaging.service.MessageQueueService;
21  import org.kuali.rice.ksb.service.KSBServiceLocator;
22  import org.kuali.rice.ksb.util.KSBConstants;
23  import org.springframework.transaction.TransactionStatus;
24  import org.springframework.transaction.support.TransactionCallback;
25  
26  /**
27   * Fetches messages from the db. Marks as 'R'. Gives messages to ThreadPool for execution
28   * 
29   * @author Kuali Rice Team (rice.collab@kuali.org)
30   * 
31   */
32  public class MessageFetcher implements Runnable {
33  
34      private static final Logger LOG = Logger.getLogger(MessageFetcher.class);
35  
36      private Integer maxMessages;
37      private Long routeQueueId;
38  
39      public MessageFetcher(Integer maxMessages) {
40  	this.maxMessages = maxMessages;
41      }
42  
43      public MessageFetcher(Long routeQueueId) {
44  	this.routeQueueId = routeQueueId;
45      }
46  
47      public void run() {
48      	if (ConfigContext.getCurrentContextConfig().getBooleanProperty(KSBConstants.Config.MESSAGE_PERSISTENCE, false)) {
49      		try {
50      			requeueDocument();
51      			requeueMessages();
52      		} catch (Throwable t) {
53      			LOG.error("Failed to fetch messages.", t);
54      		}
55      	}
56      }
57  
58      private void requeueMessages() {
59  	if (this.routeQueueId == null) {
60  	    try {
61  		for (final PersistedMessageBO message : getRouteQueueService().getNextDocuments(maxMessages)) {
62  		    markEnrouteAndSaveMessage(message);
63  		    executeMessage(message);
64  		}
65  	    } catch (Throwable t) {
66  		LOG.error("Failed to fetch or process some messages during requeueMessages", t);
67  	    }
68  	}
69      }
70  
71      private void requeueDocument() {
72  	try {
73  	    if (this.routeQueueId != null) {
74  		PersistedMessageBO message = getRouteQueueService().findByRouteQueueId(this.routeQueueId);
75  		message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING);
76  		getRouteQueueService().save(message);
77  		executeMessage(message);
78  	    }
79  	} catch (Throwable t) {
80  	    LOG.error("Failed to fetch or process some messages during requeueDocument", t);
81  	}
82      }
83  
84      private void executeMessage(PersistedMessageBO message) {
85  	try {
86  	    KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
87  	} catch (Throwable t) {
88  	    LOG.error("Failed to place message " + message + " in thread pool for execution", t);
89  	}
90      }
91  
92      private void markEnrouteAndSaveMessage(final PersistedMessageBO message) {
93  	try {
94  	    KSBServiceLocator.getTransactionTemplate().execute(new TransactionCallback() {
95  		public Object doInTransaction(TransactionStatus status) {
96  		    message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING);
97  		    getRouteQueueService().save(message);
98  		    return null;
99  		}
100 	    });
101 	} catch (Throwable t) {
102 	    LOG.error("Caught error attempting to mark message " + message + " as R", t);
103 	}
104     }
105 
106     private MessageQueueService getRouteQueueService() {
107 	return KSBServiceLocator.getMessageQueueService();
108     }
109 
110 }