1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.kuali.rice.ksb.messaging.dao.impl;
18
19 import java.util.List;
20 import java.util.Map;
21
22 import javax.xml.namespace.QName;
23
24 import org.apache.commons.lang.StringUtils;
25 import org.apache.ojb.broker.query.Criteria;
26 import org.apache.ojb.broker.query.QueryByCriteria;
27 import org.kuali.rice.core.api.config.CoreConfigHelper;
28 import org.kuali.rice.core.util.RiceUtilities;
29 import org.kuali.rice.ksb.messaging.PersistedMessageBO;
30 import org.kuali.rice.ksb.messaging.PersistedMessagePayload;
31 import org.kuali.rice.ksb.messaging.dao.MessageQueueDAO;
32 import org.kuali.rice.ksb.util.KSBConstants;
33 import org.springmodules.orm.ojb.support.PersistenceBrokerDaoSupport;
34
35
36 public class MessageQueueDAOOjbImpl extends PersistenceBrokerDaoSupport implements MessageQueueDAO {
37
38 private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger.getLogger(MessageQueueDAOOjbImpl.class);
39
40 public void remove(PersistedMessageBO routeQueue) {
41 if (LOG.isDebugEnabled()) {
42 LOG.debug("Removing message " + routeQueue);
43 }
44 Criteria crit = new Criteria();
45 crit.addEqualTo("routeQueueId", routeQueue.getRouteQueueId());
46 getPersistenceBrokerTemplate().deleteByQuery(new QueryByCriteria(PersistedMessageBO.class, crit));
47
48 crit = new Criteria();
49 crit.addEqualTo("routeQueueId", routeQueue.getPayload().getRouteQueueId());
50 getPersistenceBrokerTemplate().deleteByQuery(new QueryByCriteria(PersistedMessagePayload.class, crit));
51 }
52
53 public void save(PersistedMessageBO routeQueue) {
54 if (LOG.isDebugEnabled()) {
55 LOG.debug("Persisting message " + routeQueue);
56 }
57 getPersistenceBrokerTemplate().store(routeQueue);
58 routeQueue.getPayload().setRouteQueueId(routeQueue.getRouteQueueId());
59 getPersistenceBrokerTemplate().store(routeQueue.getPayload());
60 }
61
62 @SuppressWarnings("unchecked")
63 public List<PersistedMessageBO> findAll() {
64 if (LOG.isDebugEnabled()) {
65 LOG.debug("Returning all persisted messages");
66 }
67 return (List<PersistedMessageBO>) getPersistenceBrokerTemplate().getCollectionByQuery(
68 new QueryByCriteria(PersistedMessageBO.class));
69 }
70
71 @SuppressWarnings("unchecked")
72 public List<PersistedMessageBO> findAll(int maxRows) {
73 if (LOG.isDebugEnabled()) {
74 LOG.debug("Finding next " + maxRows + " messages");
75 }
76 QueryByCriteria query = new QueryByCriteria(PersistedMessageBO.class);
77 query.setStartAtIndex(0);
78 query.setEndAtIndex(maxRows);
79 return (List<PersistedMessageBO>) getPersistenceBrokerTemplate().getCollectionByQuery(query);
80 }
81
82 @SuppressWarnings("unchecked")
83 public List<PersistedMessageBO> findByValues(Map<String, String> criteriaValues, int maxRows) {
84 Criteria crit = new Criteria();
85 String value = null;
86 for (String key : criteriaValues.keySet()) {
87 value = criteriaValues.get(key);
88 if (StringUtils.isBlank(key) && StringUtils.isBlank(value)) {
89 throw new IllegalArgumentException("Either the key or value was blank in criteriaValues (" + key + "="
90 + value + ")");
91 }
92
93
94 if (!key.equals("routeQueueId")) {
95 if (value.contains("*")) {
96 value = value.replace("*", "%");
97 } else {
98 value = value.concat("%");
99 }
100 }
101 if (!StringUtils.containsOnly(value, "%")) {
102 crit.addLike(key, value);
103 }
104 }
105 QueryByCriteria query = new QueryByCriteria(PersistedMessageBO.class, crit);
106 query.setFetchSize(maxRows);
107 query.setStartAtIndex(0);
108 query.setEndAtIndex(maxRows);
109 return (List<PersistedMessageBO>) getPersistenceBrokerTemplate().getCollectionByQuery(query);
110 }
111
112 public PersistedMessageBO findByRouteQueueId(Long routeQueueId) {
113 Criteria criteria = new Criteria();
114 criteria.addEqualTo("routeQueueId", routeQueueId);
115 return (PersistedMessageBO) getPersistenceBrokerTemplate().getObjectByQuery(
116 new QueryByCriteria(PersistedMessageBO.class, criteria));
117 }
118
119 public PersistedMessagePayload findByPersistedMessageByRouteQueueId(Long routeQueueId) {
120 Criteria criteria = new Criteria();
121 criteria.addEqualTo("routeQueueId", routeQueueId);
122 return (PersistedMessagePayload) getPersistenceBrokerTemplate().getObjectByQuery(
123 new QueryByCriteria(PersistedMessagePayload.class, criteria));
124 }
125
126 @SuppressWarnings("unchecked")
127 public List<PersistedMessageBO> getNextDocuments(Integer maxDocuments) {
128 Criteria crit = new Criteria();
129 String applicationId = CoreConfigHelper.getApplicationId();
130 crit.addEqualTo("applicationId", applicationId);
131 crit.addNotEqualTo("queueStatus", KSBConstants.ROUTE_QUEUE_EXCEPTION);
132 crit.addEqualTo("ipNumber", RiceUtilities.getIpNumber());
133
134 QueryByCriteria query = new QueryByCriteria(PersistedMessageBO.class, crit);
135 query.addOrderByAscending("queuePriority");
136 query.addOrderByAscending("routeQueueId");
137 query.addOrderByAscending("queueDate");
138 if (maxDocuments != null)
139 query.setEndAtIndex(maxDocuments.intValue());
140 return (List) getPersistenceBrokerTemplate().getCollectionByQuery(query);
141 }
142
143 @SuppressWarnings("unchecked")
144 public List<PersistedMessageBO> findByServiceName(QName serviceName, String methodName) {
145 if (LOG.isDebugEnabled()) {
146 LOG.debug("Finding messages for service name " + serviceName);
147 }
148 Criteria crit = new Criteria();
149 crit.addEqualTo("serviceName", serviceName.toString());
150 crit.addEqualTo("methodName", methodName);
151 return (List<PersistedMessageBO>) getPersistenceBrokerTemplate().getCollectionByQuery(
152 new QueryByCriteria(PersistedMessageBO.class, crit));
153 }
154
155 }