1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.ksb.messaging.dao.impl;
17
18 import org.apache.commons.lang.StringUtils;
19 import org.apache.ojb.broker.query.Criteria;
20 import org.apache.ojb.broker.query.QueryByCriteria;
21 import org.kuali.rice.core.api.config.CoreConfigHelper;
22 import org.kuali.rice.core.api.util.RiceUtilities;
23 import org.kuali.rice.ksb.messaging.PersistedMessageBO;
24 import org.kuali.rice.ksb.messaging.PersistedMessagePayload;
25 import org.kuali.rice.ksb.messaging.dao.MessageQueueDAO;
26 import org.kuali.rice.ksb.util.KSBConstants;
27 import org.springmodules.orm.ojb.support.PersistenceBrokerDaoSupport;
28
29 import javax.xml.namespace.QName;
30 import java.util.List;
31 import java.util.Map;
32
33
34 public class MessageQueueDAOOjbImpl extends PersistenceBrokerDaoSupport implements MessageQueueDAO {
35
36 private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger.getLogger(MessageQueueDAOOjbImpl.class);
37
38 public void remove(PersistedMessageBO routeQueue) {
39 if (LOG.isDebugEnabled()) {
40 LOG.debug("Removing message " + routeQueue);
41 }
42 Criteria crit = new Criteria();
43 crit.addEqualTo("routeQueueId", routeQueue.getRouteQueueId());
44 getPersistenceBrokerTemplate().deleteByQuery(new QueryByCriteria(PersistedMessageBO.class, crit));
45
46 crit = new Criteria();
47 crit.addEqualTo("routeQueueId", routeQueue.getPayload().getRouteQueueId());
48 getPersistenceBrokerTemplate().deleteByQuery(new QueryByCriteria(PersistedMessagePayload.class, crit));
49 }
50
51 public void save(PersistedMessageBO routeQueue) {
52 if (LOG.isDebugEnabled()) {
53 LOG.debug("Persisting message " + routeQueue);
54 }
55 getPersistenceBrokerTemplate().store(routeQueue);
56 routeQueue.getPayload().setRouteQueueId(routeQueue.getRouteQueueId());
57 getPersistenceBrokerTemplate().store(routeQueue.getPayload());
58 }
59
60 @SuppressWarnings("unchecked")
61 public List<PersistedMessageBO> findAll() {
62 if (LOG.isDebugEnabled()) {
63 LOG.debug("Returning all persisted messages");
64 }
65 return (List<PersistedMessageBO>) getPersistenceBrokerTemplate().getCollectionByQuery(
66 new QueryByCriteria(PersistedMessageBO.class));
67 }
68
69 @SuppressWarnings("unchecked")
70 public List<PersistedMessageBO> findAll(int maxRows) {
71 if (LOG.isDebugEnabled()) {
72 LOG.debug("Finding next " + maxRows + " messages");
73 }
74 QueryByCriteria query = new QueryByCriteria(PersistedMessageBO.class);
75 query.setStartAtIndex(0);
76 query.setEndAtIndex(maxRows);
77 return (List<PersistedMessageBO>) getPersistenceBrokerTemplate().getCollectionByQuery(query);
78 }
79
80 @SuppressWarnings("unchecked")
81 public List<PersistedMessageBO> findByValues(Map<String, String> criteriaValues, int maxRows) {
82 Criteria crit = new Criteria();
83 String value = null;
84 for (String key : criteriaValues.keySet()) {
85 value = criteriaValues.get(key);
86 if (StringUtils.isBlank(key) && StringUtils.isBlank(value)) {
87 throw new IllegalArgumentException("Either the key or value was blank in criteriaValues (" + key + "="
88 + value + ")");
89 }
90
91
92 if (!key.equals("routeQueueId")) {
93 if (value.contains("*")) {
94 value = value.replace("*", "%");
95 } else {
96 value = value.concat("%");
97 }
98 }
99 if (!StringUtils.containsOnly(value, "%")) {
100 crit.addLike(key, value);
101 }
102 }
103 QueryByCriteria query = new QueryByCriteria(PersistedMessageBO.class, crit);
104 query.setFetchSize(maxRows);
105 query.setStartAtIndex(0);
106 query.setEndAtIndex(maxRows);
107 return (List<PersistedMessageBO>) getPersistenceBrokerTemplate().getCollectionByQuery(query);
108 }
109
110 public PersistedMessageBO findByRouteQueueId(Long routeQueueId) {
111 Criteria criteria = new Criteria();
112 criteria.addEqualTo("routeQueueId", routeQueueId);
113 return (PersistedMessageBO) getPersistenceBrokerTemplate().getObjectByQuery(
114 new QueryByCriteria(PersistedMessageBO.class, criteria));
115 }
116
117 public PersistedMessagePayload findByPersistedMessageByRouteQueueId(Long routeQueueId) {
118 Criteria criteria = new Criteria();
119 criteria.addEqualTo("routeQueueId", routeQueueId);
120 return (PersistedMessagePayload) getPersistenceBrokerTemplate().getObjectByQuery(
121 new QueryByCriteria(PersistedMessagePayload.class, criteria));
122 }
123
124 @SuppressWarnings("unchecked")
125 public List<PersistedMessageBO> getNextDocuments(Integer maxDocuments) {
126 Criteria crit = new Criteria();
127 String applicationId = CoreConfigHelper.getApplicationId();
128 crit.addEqualTo("applicationId", applicationId);
129 crit.addNotEqualTo("queueStatus", KSBConstants.ROUTE_QUEUE_EXCEPTION);
130 crit.addEqualTo("ipNumber", RiceUtilities.getIpNumber());
131
132 QueryByCriteria query = new QueryByCriteria(PersistedMessageBO.class, crit);
133 query.addOrderByAscending("queuePriority");
134 query.addOrderByAscending("routeQueueId");
135 query.addOrderByAscending("queueDate");
136 if (maxDocuments != null)
137 query.setEndAtIndex(maxDocuments.intValue());
138 return (List) getPersistenceBrokerTemplate().getCollectionByQuery(query);
139 }
140
141 @SuppressWarnings("unchecked")
142 public List<PersistedMessageBO> findByServiceName(QName serviceName, String methodName) {
143 if (LOG.isDebugEnabled()) {
144 LOG.debug("Finding messages for service name " + serviceName);
145 }
146 Criteria crit = new Criteria();
147 crit.addEqualTo("serviceName", serviceName.toString());
148 crit.addEqualTo("methodName", methodName);
149 return (List<PersistedMessageBO>) getPersistenceBrokerTemplate().getCollectionByQuery(
150 new QueryByCriteria(PersistedMessageBO.class, crit));
151 }
152
153 }