View Javadoc

1   /*
2    * Copyright 2007 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.ksb.messaging.service.impl;
17  
18  import java.sql.Timestamp;
19  import java.util.List;
20  import java.util.Map;
21  
22  import javax.xml.namespace.QName;
23  
24  import org.apache.log4j.Logger;
25  import org.kuali.rice.core.config.ConfigContext;
26  import org.kuali.rice.core.util.RiceUtilities;
27  import org.kuali.rice.ksb.messaging.AsynchronousCall;
28  import org.kuali.rice.ksb.messaging.PersistedMessagePayload;
29  import org.kuali.rice.ksb.messaging.PersistedMessage;
30  import org.kuali.rice.ksb.messaging.ServiceInfo;
31  import org.kuali.rice.ksb.messaging.dao.MessageQueueDAO;
32  import org.kuali.rice.ksb.messaging.service.MessageQueueService;
33  import org.kuali.rice.ksb.util.KSBConstants;
34  
35  
36  public class MessageQueueServiceImpl implements MessageQueueService {
37  
38      
39      	private static final Logger LOG = Logger.getLogger(MessageQueueServiceImpl.class);
40      private MessageQueueDAO messageQueueDAO;
41  
42      public void delete(PersistedMessage routeQueue) {
43  	    if (new Boolean(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.MESSAGE_PERSISTENCE))) {
44  		if (LOG.isDebugEnabled()) {
45  		    LOG.debug("Message Persistence is on.  Deleting stored message" + routeQueue);
46  		}
47  	this.getMessageQueueDAO().remove(routeQueue);
48      }
49  	}
50  
51      public void save(PersistedMessage routeQueue) {
52  	    if (new Boolean(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.MESSAGE_PERSISTENCE))) {
53  		if (LOG.isDebugEnabled()) {
54  			LOG.debug("Persisting Message " + routeQueue);
55  		}
56  	this.getMessageQueueDAO().save(routeQueue);
57      }
58  	}
59  
60      public List<PersistedMessage> findAll() {
61  	return this.getMessageQueueDAO().findAll();
62      }
63  
64      public List<PersistedMessage> findAll(int maxRows) {
65  	return this.getMessageQueueDAO().findAll(maxRows);
66      }
67  
68      public PersistedMessage findByRouteQueueId(Long routeQueueId) {
69  	return getMessageQueueDAO().findByRouteQueueId(routeQueueId);
70      }
71      
72      public PersistedMessagePayload findByPersistedMessageByRouteQueueId(Long routeQueueId) {
73  	return messageQueueDAO.findByPersistedMessageByRouteQueueId(routeQueueId);
74      }
75  
76      public List<PersistedMessage> getNextDocuments(Integer maxDocuments) {
77  	return this.getMessageQueueDAO().getNextDocuments(maxDocuments);
78      }
79  
80      public MessageQueueDAO getMessageQueueDAO() {
81  	return this.messageQueueDAO;
82      }
83  
84      public void setMessageQueueDAO(MessageQueueDAO queueDAO) {
85  	this.messageQueueDAO = queueDAO;
86      }
87  
88      public List<PersistedMessage> findByServiceName(QName serviceName, String methodName) {
89  	return getMessageQueueDAO().findByServiceName(serviceName, methodName);
90      }
91  
92      public List<PersistedMessage> findByValues(Map<String, String> criteriaValues, int maxRows) {
93  	return getMessageQueueDAO().findByValues(criteriaValues, maxRows);
94      }
95  
96      public Integer getMaxRetryAttempts() {
97  	return new Integer(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.ROUTE_QUEUE_MAX_RETRY_ATTEMPTS_KEY));
98      }
99  
100     public PersistedMessage getMessage(ServiceInfo serviceInfo, AsynchronousCall methodCall) {
101 	PersistedMessage message = new PersistedMessage();
102 	message.setPayload(new PersistedMessagePayload(methodCall, message));
103 	message.setIpNumber(RiceUtilities.getIpNumber());
104 	message.setServiceName(serviceInfo.getQname().toString());
105 	    message.setQueueDate(new Timestamp(System.currentTimeMillis()));
106 	if (serviceInfo.getServiceDefinition().getPriority() == null) {
107 	    message.setQueuePriority(KSBConstants.ROUTE_QUEUE_DEFAULT_PRIORITY);
108 	} else {
109 	    message.setQueuePriority(serviceInfo.getServiceDefinition().getPriority());
110 	}
111 	message.setQueueStatus(KSBConstants.ROUTE_QUEUE_QUEUED);
112 	message.setRetryCount(0);
113 	if (serviceInfo.getServiceDefinition().getMillisToLive() > 0) {
114 			message.setExpirationDate(new Timestamp(System.currentTimeMillis() + serviceInfo.getServiceDefinition().getMillisToLive()));
115 	}
116 	message.setServiceNamespace(ConfigContext.getCurrentContextConfig().getServiceNamespace());
117 	message.setMethodName(methodCall.getMethodName());
118 	return message;
119     }
120 }