View Javadoc
1   /**
2    * Copyright 2005-2015 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.ksb.messaging;
17  
18  import org.apache.log4j.Logger;
19  import org.kuali.rice.core.api.config.property.ConfigContext;
20  import org.kuali.rice.ksb.messaging.service.MessageQueueService;
21  import org.kuali.rice.ksb.service.KSBServiceLocator;
22  import org.kuali.rice.ksb.util.KSBConstants;
23  import org.springframework.transaction.TransactionStatus;
24  import org.springframework.transaction.support.TransactionCallback;
25  
26  /**
27   * Fetches messages from the db. Marks as 'R'. Gives messages to ThreadPool for execution
28   *
29   * @author Kuali Rice Team (rice.collab@kuali.org)
30   */
31  public class MessageFetcher implements Runnable {
32  
33      private static final Logger LOG = Logger.getLogger(MessageFetcher.class);
34  
35      private Integer maxMessages;
36      private Long routeQueueId;
37  
38      public MessageFetcher(Integer maxMessages) {
39          this.maxMessages = maxMessages;
40      }
41  
42      public MessageFetcher(Long routeQueueId) {
43          this.routeQueueId = routeQueueId;
44      }
45  
46      public void run() {
47          if (ConfigContext.getCurrentContextConfig().getBooleanProperty(KSBConstants.Config.MESSAGE_PERSISTENCE,
48                  false)) {
49              try {
50                  requeueDocument();
51                  requeueMessages();
52              } catch (Throwable t) {
53                  LOG.error("Failed to fetch messages.", t);
54              }
55          }
56      }
57  
58      private void requeueMessages() {
59          if (this.routeQueueId == null) {
60              try {
61                  for (PersistedMessageBO message : getRouteQueueService().getNextDocuments(maxMessages)) {
62                      message = markEnrouteAndSaveMessage(message);
63                      executeMessage(message);
64                  }
65              } catch (Throwable t) {
66                  LOG.error("Failed to fetch or process some messages during requeueMessages", t);
67              }
68          }
69      }
70  
71      private void requeueDocument() {
72          try {
73              if (this.routeQueueId != null) {
74                  PersistedMessageBO message = getRouteQueueService().findByRouteQueueId(this.routeQueueId);
75                  message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING);
76                  message = getRouteQueueService().save(message);
77                  executeMessage(message);
78              }
79          } catch (Throwable t) {
80              LOG.error("Failed to fetch or process some messages during requeueDocument", t);
81          }
82      }
83  
84      private void executeMessage(PersistedMessageBO message) {
85          try {
86              KSBServiceLocator.getThreadPool().execute(new MessageServiceInvoker(message));
87          } catch (Throwable t) {
88              LOG.error("Failed to place message " + message + " in thread pool for execution", t);
89          }
90      }
91  
92      private PersistedMessageBO markEnrouteAndSaveMessage(final PersistedMessageBO message) {
93          try {
94              return KSBServiceLocator.getTransactionTemplate().execute(new TransactionCallback<PersistedMessageBO>() {
95                  public PersistedMessageBO doInTransaction(TransactionStatus status) {
96                      message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING);
97                      return getRouteQueueService().save(message);
98                  }
99              });
100         } catch (Throwable t) {
101             LOG.error("Caught error attempting to mark message " + message + " as R", t);
102         }
103         return message;
104     }
105 
106     private MessageQueueService getRouteQueueService() {
107         return KSBServiceLocator.getMessageQueueService();
108     }
109 
110 }