View Javadoc

1   /**
2    * Copyright 2005-2011 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.ksb.messaging.service.impl;
17  
18  import org.apache.log4j.Logger;
19  import org.kuali.rice.core.api.config.CoreConfigHelper;
20  import org.kuali.rice.core.api.config.property.ConfigContext;
21  import org.kuali.rice.core.api.util.RiceUtilities;
22  import org.kuali.rice.ksb.api.bus.ServiceConfiguration;
23  import org.kuali.rice.ksb.api.messaging.AsynchronousCall;
24  import org.kuali.rice.ksb.messaging.PersistedMessageBO;
25  import org.kuali.rice.ksb.messaging.PersistedMessagePayload;
26  import org.kuali.rice.ksb.messaging.dao.MessageQueueDAO;
27  import org.kuali.rice.ksb.messaging.service.MessageQueueService;
28  import org.kuali.rice.ksb.util.KSBConstants;
29  
30  import javax.xml.namespace.QName;
31  import java.sql.Timestamp;
32  import java.util.List;
33  import java.util.Map;
34  
35  public class MessageQueueServiceImpl implements MessageQueueService {
36  
37  
38      private static final Logger LOG = Logger.getLogger(MessageQueueServiceImpl.class);
39      private MessageQueueDAO messageQueueDao;
40  
41      public void delete(PersistedMessageBO routeQueue) {
42          if (Boolean.valueOf(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_PERSISTENCE))) {
43              if (LOG.isDebugEnabled()) {
44                  LOG.debug("Message Persistence is on.  Deleting stored message" + routeQueue);
45              }
46              this.getMessageQueueDao().remove(routeQueue);
47          }
48      }
49  
50      public void save(PersistedMessageBO routeQueue) {
51          if (Boolean.valueOf(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_PERSISTENCE))) {
52              if (LOG.isDebugEnabled()) {
53                  LOG.debug("Persisting Message " + routeQueue);
54              }
55              this.getMessageQueueDao().save(routeQueue);
56          }
57      }
58  
59      public List<PersistedMessageBO> findAll() {
60          return this.getMessageQueueDao().findAll();
61      }
62  
63      public List<PersistedMessageBO> findAll(int maxRows) {
64          return this.getMessageQueueDao().findAll(maxRows);
65      }
66  
67      public PersistedMessageBO findByRouteQueueId(Long routeQueueId) {
68          return getMessageQueueDao().findByRouteQueueId(routeQueueId);
69      }
70  
71      public PersistedMessagePayload findByPersistedMessageByRouteQueueId(Long routeQueueId) {
72          return messageQueueDao.findByPersistedMessageByRouteQueueId(routeQueueId);
73      }
74  
75      public List<PersistedMessageBO> getNextDocuments(Integer maxDocuments) {
76          return this.getMessageQueueDao().getNextDocuments(maxDocuments);
77      }
78  
79      public MessageQueueDAO getMessageQueueDao() {
80          return this.messageQueueDao;
81      }
82  
83      public void setMessageQueueDao(MessageQueueDAO queueDAO) {
84          this.messageQueueDao = queueDAO;
85      }
86  
87      public List<PersistedMessageBO> findByServiceName(QName serviceName, String methodName) {
88          return getMessageQueueDao().findByServiceName(serviceName, methodName);
89      }
90  
91      public List<PersistedMessageBO> findByValues(Map<String, String> criteriaValues, int maxRows) {
92          return getMessageQueueDao().findByValues(criteriaValues, maxRows);
93      }
94  
95      public Integer getMaxRetryAttempts() {
96          return new Integer(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.ROUTE_QUEUE_MAX_RETRY_ATTEMPTS_KEY));
97      }
98  
99      public PersistedMessageBO getMessage(ServiceConfiguration serviceConfiguration, AsynchronousCall methodCall) {
100         PersistedMessageBO message = new PersistedMessageBO();
101         message.setPayload(new PersistedMessagePayload(methodCall, message));
102         message.setIpNumber(RiceUtilities.getIpNumber());
103         message.setServiceName(serviceConfiguration.getServiceName().toString());
104         message.setQueueDate(new Timestamp(System.currentTimeMillis()));
105         if (serviceConfiguration.getPriority() == null) {
106             message.setQueuePriority(KSBConstants.ROUTE_QUEUE_DEFAULT_PRIORITY);
107         } else {
108             message.setQueuePriority(serviceConfiguration.getPriority());
109         }
110         message.setQueueStatus(KSBConstants.ROUTE_QUEUE_QUEUED);
111         message.setRetryCount(0);
112         if (serviceConfiguration.getMillisToLive() > 0) {
113             message.setExpirationDate(new Timestamp(System.currentTimeMillis() + serviceConfiguration.getMillisToLive()));
114         }
115         message.setApplicationId(CoreConfigHelper.getApplicationId());
116         message.setMethodName(methodCall.getMethodName());
117         return message;
118     }
119 }