001 /** 002 * Copyright 2005-2011 The Kuali Foundation 003 * 004 * Licensed under the Educational Community License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl2.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 package org.kuali.rice.ksb.messaging; 017 018 import org.apache.log4j.Logger; 019 import org.kuali.rice.core.api.config.property.ConfigContext; 020 import org.kuali.rice.ksb.messaging.service.MessageQueueService; 021 import org.kuali.rice.ksb.service.KSBServiceLocator; 022 import org.kuali.rice.ksb.util.KSBConstants; 023 import org.kuali.rice.ksb.util.KSBConstants.Config; 024 import org.springframework.transaction.TransactionStatus; 025 import org.springframework.transaction.support.TransactionCallback; 026 027 /** 028 * Fetches messages from the db. Marks as 'R'. Gives messages to ThreadPool for execution 029 * 030 * @author Kuali Rice Team (rice.collab@kuali.org) 031 * 032 */ 033 public class MessageFetcher implements Runnable { 034 035 private static final Logger LOG = Logger.getLogger(MessageFetcher.class); 036 037 private Integer maxMessages; 038 private Long routeQueueId; 039 040 public MessageFetcher(Integer maxMessages) { 041 this.maxMessages = maxMessages; 042 } 043 044 public MessageFetcher(Long routeQueueId) { 045 this.routeQueueId = routeQueueId; 046 } 047 048 public void run() { 049 if (ConfigContext.getCurrentContextConfig().getBooleanProperty(KSBConstants.Config.MESSAGE_PERSISTENCE, false)) { 050 try { 051 requeueDocument(); 052 requeueMessages(); 053 } catch (Throwable t) { 054 LOG.error("Failed to fetch messages.", t); 055 } 056 } 057 } 058 059 private void requeueMessages() { 060 if (this.routeQueueId == null) { 061 try { 062 for (final PersistedMessageBO message : getRouteQueueService().getNextDocuments(maxMessages)) { 063 markEnrouteAndSaveMessage(message); 064 executeMessage(message); 065 } 066 } catch (Throwable t) { 067 LOG.error("Failed to fetch or process some messages during requeueMessages", t); 068 } 069 } 070 } 071 072 private void requeueDocument() { 073 try { 074 if (this.routeQueueId != null) { 075 PersistedMessageBO message = getRouteQueueService().findByRouteQueueId(this.routeQueueId); 076 message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING); 077 getRouteQueueService().save(message); 078 executeMessage(message); 079 } 080 } catch (Throwable t) { 081 LOG.error("Failed to fetch or process some messages during requeueDocument", t); 082 } 083 } 084 085 private void executeMessage(PersistedMessageBO message) { 086 try { 087 KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message)); 088 } catch (Throwable t) { 089 LOG.error("Failed to place message " + message + " in thread pool for execution", t); 090 } 091 } 092 093 private void markEnrouteAndSaveMessage(final PersistedMessageBO message) { 094 try { 095 KSBServiceLocator.getTransactionTemplate().execute(new TransactionCallback() { 096 public Object doInTransaction(TransactionStatus status) { 097 message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING); 098 getRouteQueueService().save(message); 099 return null; 100 } 101 }); 102 } catch (Throwable t) { 103 LOG.error("Caught error attempting to mark message " + message + " as R", t); 104 } 105 } 106 107 private MessageQueueService getRouteQueueService() { 108 return KSBServiceLocator.getMessageQueueService(); 109 } 110 111 }