001 /** 002 * Copyright 2005-2014 The Kuali Foundation 003 * 004 * Licensed under the Educational Community License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl2.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 package org.kuali.rice.ksb.messaging; 017 018 import org.apache.log4j.Logger; 019 import org.kuali.rice.core.api.config.property.ConfigContext; 020 import org.kuali.rice.ksb.messaging.service.MessageQueueService; 021 import org.kuali.rice.ksb.service.KSBServiceLocator; 022 import org.kuali.rice.ksb.util.KSBConstants; 023 import org.springframework.transaction.TransactionStatus; 024 import org.springframework.transaction.support.TransactionCallback; 025 026 /** 027 * Fetches messages from the db. Marks as 'R'. Gives messages to ThreadPool for execution 028 * 029 * @author Kuali Rice Team (rice.collab@kuali.org) 030 */ 031 public class MessageFetcher implements Runnable { 032 033 private static final Logger LOG = Logger.getLogger(MessageFetcher.class); 034 035 private Integer maxMessages; 036 private Long routeQueueId; 037 038 public MessageFetcher(Integer maxMessages) { 039 this.maxMessages = maxMessages; 040 } 041 042 public MessageFetcher(Long routeQueueId) { 043 this.routeQueueId = routeQueueId; 044 } 045 046 public void run() { 047 if (ConfigContext.getCurrentContextConfig().getBooleanProperty(KSBConstants.Config.MESSAGE_PERSISTENCE, 048 false)) { 049 try { 050 requeueDocument(); 051 requeueMessages(); 052 } catch (Throwable t) { 053 LOG.error("Failed to fetch messages.", t); 054 } 055 } 056 } 057 058 private void requeueMessages() { 059 if (this.routeQueueId == null) { 060 try { 061 for (PersistedMessageBO message : getRouteQueueService().getNextDocuments(maxMessages)) { 062 message = markEnrouteAndSaveMessage(message); 063 executeMessage(message); 064 } 065 } catch (Throwable t) { 066 LOG.error("Failed to fetch or process some messages during requeueMessages", t); 067 } 068 } 069 } 070 071 private void requeueDocument() { 072 try { 073 if (this.routeQueueId != null) { 074 PersistedMessageBO message = getRouteQueueService().findByRouteQueueId(this.routeQueueId); 075 message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING); 076 message = getRouteQueueService().save(message); 077 executeMessage(message); 078 } 079 } catch (Throwable t) { 080 LOG.error("Failed to fetch or process some messages during requeueDocument", t); 081 } 082 } 083 084 private void executeMessage(PersistedMessageBO message) { 085 try { 086 KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message)); 087 } catch (Throwable t) { 088 LOG.error("Failed to place message " + message + " in thread pool for execution", t); 089 } 090 } 091 092 private PersistedMessageBO markEnrouteAndSaveMessage(final PersistedMessageBO message) { 093 try { 094 return KSBServiceLocator.getTransactionTemplate().execute(new TransactionCallback<PersistedMessageBO>() { 095 public PersistedMessageBO doInTransaction(TransactionStatus status) { 096 message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING); 097 return getRouteQueueService().save(message); 098 } 099 }); 100 } catch (Throwable t) { 101 LOG.error("Caught error attempting to mark message " + message + " as R", t); 102 } 103 return message; 104 } 105 106 private MessageQueueService getRouteQueueService() { 107 return KSBServiceLocator.getMessageQueueService(); 108 } 109 110 }