1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.ksb.messaging.service.impl;
17
18 import java.sql.Timestamp;
19 import java.util.List;
20 import java.util.Map;
21
22 import javax.xml.namespace.QName;
23
24 import org.apache.log4j.Logger;
25 import org.kuali.rice.core.config.ConfigContext;
26 import org.kuali.rice.core.util.RiceUtilities;
27 import org.kuali.rice.ksb.messaging.AsynchronousCall;
28 import org.kuali.rice.ksb.messaging.PersistedMessagePayload;
29 import org.kuali.rice.ksb.messaging.PersistedMessage;
30 import org.kuali.rice.ksb.messaging.ServiceInfo;
31 import org.kuali.rice.ksb.messaging.dao.MessageQueueDAO;
32 import org.kuali.rice.ksb.messaging.service.MessageQueueService;
33 import org.kuali.rice.ksb.util.KSBConstants;
34
35
36 public class MessageQueueServiceImpl implements MessageQueueService {
37
38
39 private static final Logger LOG = Logger.getLogger(MessageQueueServiceImpl.class);
40 private MessageQueueDAO messageQueueDAO;
41
42 public void delete(PersistedMessage routeQueue) {
43 if (new Boolean(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.MESSAGE_PERSISTENCE))) {
44 if (LOG.isDebugEnabled()) {
45 LOG.debug("Message Persistence is on. Deleting stored message" + routeQueue);
46 }
47 this.getMessageQueueDAO().remove(routeQueue);
48 }
49 }
50
51 public void save(PersistedMessage routeQueue) {
52 if (new Boolean(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.MESSAGE_PERSISTENCE))) {
53 if (LOG.isDebugEnabled()) {
54 LOG.debug("Persisting Message " + routeQueue);
55 }
56 this.getMessageQueueDAO().save(routeQueue);
57 }
58 }
59
60 public List<PersistedMessage> findAll() {
61 return this.getMessageQueueDAO().findAll();
62 }
63
64 public List<PersistedMessage> findAll(int maxRows) {
65 return this.getMessageQueueDAO().findAll(maxRows);
66 }
67
68 public PersistedMessage findByRouteQueueId(Long routeQueueId) {
69 return getMessageQueueDAO().findByRouteQueueId(routeQueueId);
70 }
71
72 public PersistedMessagePayload findByPersistedMessageByRouteQueueId(Long routeQueueId) {
73 return messageQueueDAO.findByPersistedMessageByRouteQueueId(routeQueueId);
74 }
75
76 public List<PersistedMessage> getNextDocuments(Integer maxDocuments) {
77 return this.getMessageQueueDAO().getNextDocuments(maxDocuments);
78 }
79
80 public MessageQueueDAO getMessageQueueDAO() {
81 return this.messageQueueDAO;
82 }
83
84 public void setMessageQueueDAO(MessageQueueDAO queueDAO) {
85 this.messageQueueDAO = queueDAO;
86 }
87
88 public List<PersistedMessage> findByServiceName(QName serviceName, String methodName) {
89 return getMessageQueueDAO().findByServiceName(serviceName, methodName);
90 }
91
92 public List<PersistedMessage> findByValues(Map<String, String> criteriaValues, int maxRows) {
93 return getMessageQueueDAO().findByValues(criteriaValues, maxRows);
94 }
95
96 public Integer getMaxRetryAttempts() {
97 return new Integer(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.ROUTE_QUEUE_MAX_RETRY_ATTEMPTS_KEY));
98 }
99
100 public PersistedMessage getMessage(ServiceInfo serviceInfo, AsynchronousCall methodCall) {
101 PersistedMessage message = new PersistedMessage();
102 message.setPayload(new PersistedMessagePayload(methodCall, message));
103 message.setIpNumber(RiceUtilities.getIpNumber());
104 message.setServiceName(serviceInfo.getQname().toString());
105 message.setQueueDate(new Timestamp(System.currentTimeMillis()));
106 if (serviceInfo.getServiceDefinition().getPriority() == null) {
107 message.setQueuePriority(KSBConstants.ROUTE_QUEUE_DEFAULT_PRIORITY);
108 } else {
109 message.setQueuePriority(serviceInfo.getServiceDefinition().getPriority());
110 }
111 message.setQueueStatus(KSBConstants.ROUTE_QUEUE_QUEUED);
112 message.setRetryCount(0);
113 if (serviceInfo.getServiceDefinition().getMillisToLive() > 0) {
114 message.setExpirationDate(new Timestamp(System.currentTimeMillis() + serviceInfo.getServiceDefinition().getMillisToLive()));
115 }
116 message.setServiceNamespace(ConfigContext.getCurrentContextConfig().getServiceNamespace());
117 message.setMethodName(methodCall.getMethodName());
118 return message;
119 }
120 }