View Javadoc
1   /**
2    * Copyright 2005-2015 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.ksb.messaging.exceptionhandling;
17  
18  import java.sql.Timestamp;
19  
20  import org.apache.commons.lang.StringUtils;
21  import org.apache.log4j.Logger;
22  import org.kuali.rice.core.api.config.property.ConfigContext;
23  import org.kuali.rice.ksb.api.bus.ServiceConfiguration;
24  import org.kuali.rice.ksb.messaging.PersistedMessageBO;
25  import org.kuali.rice.ksb.service.KSBServiceLocator;
26  import org.kuali.rice.ksb.util.KSBConstants;
27  
28  
29  /**
30   * Default implementation of the {@link MessageExceptionHandler} which handles exceptions thrown from message processing.
31   * 
32   * @author Kuali Rice Team (rice.collab@kuali.org)
33   */
34  public class DefaultMessageExceptionHandler implements MessageExceptionHandler {
35  
36      private static final Logger LOG = Logger.getLogger(DefaultMessageExceptionHandler.class);
37  
38      private static final long DEFAULT_TIME_INCREMENT = 60 * 60 * 1000;
39  
40      private static final int DEFAULT_MAX_RETRIES = 7;
41  
42      public void handleException(Throwable throwable, PersistedMessageBO message, Object service) throws Exception {
43          if (isInException(message)) {
44              placeInException(throwable, message);
45          } else {
46              requeue(throwable, message);
47          }
48      }
49  
50      public void handleExceptionLastDitchEffort(Throwable throwable, PersistedMessageBO message, Object service) throws Exception {
51  		LOG.error("Complete failure when attempting to put message into exception routing!  Message was: " + message, throwable);
52  	}
53  
54  	public boolean isInException(PersistedMessageBO message) {
55          ServiceConfiguration serviceConfiguration = message.getMethodCall().getServiceConfiguration();
56  
57          if (getImmediateExceptionRouting()) {
58              return true;
59          }
60  
61          Integer globalMaxRetryAttempts = getGlobalMaxRetryAttempts();
62          if (globalMaxRetryAttempts != null) {
63              LOG.info("Global Max Retry has been set, so is overriding other max retry attempts.");
64              LOG.info("Global Max Retry count = " + globalMaxRetryAttempts + ".");
65              return (message.getRetryCount().intValue() >= globalMaxRetryAttempts.intValue());
66          }
67  
68          if (serviceConfiguration.getRetryAttempts() > 0) {
69              LOG.info("Message set for retry exception handling.  Message retry count = " + message.getRetryCount());
70              if (message.getRetryCount() >= serviceConfiguration.getRetryAttempts()) {
71                  return true;
72              }
73          } else if (serviceConfiguration.getMillisToLive() > 0) {
74              LOG.info("Message set for time to live exception handling.  Message expiration date = " + message.getExpirationDate().getTime());
75              if (System.currentTimeMillis() > message.getExpirationDate().getTime()) {
76                  return true;
77              }
78          } else if (message.getRetryCount() >= this.getMaxRetryAttempts()) {
79              LOG.info("Message set for default exception handling.  Comparing retry count = " + message.getRetryCount() + " against default max count.");
80              return true;
81          }
82          return false;
83      }
84  
85      protected void requeue(Throwable throwable, PersistedMessageBO message) throws Exception {
86          Integer retryCount = message.getRetryCount();
87          message.setQueueStatus(KSBConstants.ROUTE_QUEUE_QUEUED);
88          long addMilliseconds = Math.round(getTimeIncrement() * Math.pow(2, retryCount));
89          Timestamp currentTime = message.getQueueDate();
90          Timestamp newTime = new Timestamp(currentTime.getTime() + addMilliseconds);
91          message.setQueueStatus(KSBConstants.ROUTE_QUEUE_QUEUED);
92          message.setRetryCount(new Integer(retryCount + 1));
93          message.setQueueDate(newTime);
94          scheduleExecution(throwable, message);
95      }
96  
97      protected void placeInException(Throwable throwable, PersistedMessageBO message) throws Exception {
98          message.setQueueStatus(KSBConstants.ROUTE_QUEUE_EXCEPTION);
99          message.setQueueDate(new Timestamp(System.currentTimeMillis()));
100         message = KSBServiceLocator.getMessageQueueService().save(message);
101     }
102 
103     protected void scheduleExecution(Throwable throwable, PersistedMessageBO message) throws Exception {
104         KSBServiceLocator.getExceptionRoutingService().scheduleExecution(throwable, message, null);
105     }
106 
107     public Integer getMaxRetryAttempts() {
108         try {
109             return new Integer(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.ROUTE_QUEUE_MAX_RETRY_ATTEMPTS_KEY));
110         } catch (NumberFormatException e) {
111             LOG.error("Constant '" + KSBConstants.Config.ROUTE_QUEUE_MAX_RETRY_ATTEMPTS_KEY + "' is not a number and is being " + "used as a default for exception messages.  " + DEFAULT_MAX_RETRIES + " will be used as a retry limit until this number is fixed");
112             return DEFAULT_MAX_RETRIES;
113         }
114     }
115 
116     public Integer getGlobalMaxRetryAttempts() {
117         String globalMax = ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.ROUTE_QUEUE_MAX_RETRY_ATTEMPTS_OVERRIDE_KEY);
118         if (StringUtils.isBlank(globalMax)) {
119             return null;
120         }
121         try {
122             Integer globalMaxRetries = new Integer(globalMax);
123             if (globalMaxRetries >= 0) {
124                 return globalMaxRetries;
125             }
126         } catch (NumberFormatException e) {
127             LOG.error("Constant '" + KSBConstants.Config.ROUTE_QUEUE_MAX_RETRY_ATTEMPTS_OVERRIDE_KEY + "' is not a number and is being " + "used as a default for exception messages.  " + DEFAULT_MAX_RETRIES + " will be used as a retry limit until this number is fixed");
128         }
129         return null;
130     }
131 
132     public Long getTimeIncrement() {
133         try {
134             return new Long(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.ROUTE_QUEUE_TIME_INCREMENT_KEY));
135         } catch (NumberFormatException e) {
136             LOG.error("Constant '" + KSBConstants.Config.ROUTE_QUEUE_TIME_INCREMENT_KEY + "' is not a number and will not be used " + "as the default time increment for exception routing.  Default of " + DEFAULT_TIME_INCREMENT + " will be used.");
137             return DEFAULT_TIME_INCREMENT;
138         }
139     }
140 
141     public Boolean getImmediateExceptionRouting() {
142         return new Boolean(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.IMMEDIATE_EXCEPTION_ROUTING));
143     }
144 }