View Javadoc

1   /*
2    * Copyright 2007-2008 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.ksb.messaging;
17  
18  import org.apache.log4j.Logger;
19  import org.kuali.rice.core.api.config.property.ConfigContext;
20  import org.kuali.rice.ksb.messaging.service.MessageQueueService;
21  import org.kuali.rice.ksb.service.KSBServiceLocator;
22  import org.kuali.rice.ksb.util.KSBConstants;
23  import org.kuali.rice.ksb.util.KSBConstants.Config;
24  import org.springframework.transaction.TransactionStatus;
25  import org.springframework.transaction.support.TransactionCallback;
26  
27  /**
28   * Fetches messages from the db. Marks as 'R'. Gives messages to ThreadPool for execution
29   * 
30   * @author Kuali Rice Team (rice.collab@kuali.org)
31   * 
32   */
33  public class MessageFetcher implements Runnable {
34  
35      private static final Logger LOG = Logger.getLogger(MessageFetcher.class);
36  
37      private Integer maxMessages;
38      private Long routeQueueId;
39  
40      public MessageFetcher(Integer maxMessages) {
41  	this.maxMessages = maxMessages;
42      }
43  
44      public MessageFetcher(Long routeQueueId) {
45  	this.routeQueueId = routeQueueId;
46      }
47  
48      public void run() {
49      	if (ConfigContext.getCurrentContextConfig().getBooleanProperty(KSBConstants.Config.MESSAGE_PERSISTENCE, false)) {
50      		try {
51      			requeueDocument();
52      			requeueMessages();
53      		} catch (Throwable t) {
54      			LOG.error("Failed to fetch messages.", t);
55      		}
56      	}
57      }
58  
59      private void requeueMessages() {
60  	if (this.routeQueueId == null) {
61  	    try {
62  		for (final PersistedMessageBO message : getRouteQueueService().getNextDocuments(maxMessages)) {
63  		    markEnrouteAndSaveMessage(message);
64  		    executeMessage(message);
65  		}
66  	    } catch (Throwable t) {
67  		LOG.error("Failed to fetch or process some messages during requeueMessages", t);
68  	    }
69  	}
70      }
71  
72      private void requeueDocument() {
73  	try {
74  	    if (this.routeQueueId != null) {
75  		PersistedMessageBO message = getRouteQueueService().findByRouteQueueId(this.routeQueueId);
76  		message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING);
77  		getRouteQueueService().save(message);
78  		executeMessage(message);
79  	    }
80  	} catch (Throwable t) {
81  	    LOG.error("Failed to fetch or process some messages during requeueDocument", t);
82  	}
83      }
84  
85      private void executeMessage(PersistedMessageBO message) {
86  	try {
87  	    KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
88  	} catch (Throwable t) {
89  	    LOG.error("Failed to place message " + message + " in thread pool for execution", t);
90  	}
91      }
92  
93      private void markEnrouteAndSaveMessage(final PersistedMessageBO message) {
94  	try {
95  	    KSBServiceLocator.getTransactionTemplate().execute(new TransactionCallback() {
96  		public Object doInTransaction(TransactionStatus status) {
97  		    message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING);
98  		    getRouteQueueService().save(message);
99  		    return null;
100 		}
101 	    });
102 	} catch (Throwable t) {
103 	    LOG.error("Caught error attempting to mark message " + message + " as R", t);
104 	}
105     }
106 
107     private MessageQueueService getRouteQueueService() {
108 	return KSBServiceLocator.getMessageQueueService();
109     }
110 
111 }