001 /**
002 * Copyright 2005-2013 The Kuali Foundation
003 *
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/ecl2.php
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package org.kuali.rice.ksb.messaging.dao.impl;
017
018 import org.apache.commons.lang.StringUtils;
019 import org.apache.ojb.broker.query.Criteria;
020 import org.apache.ojb.broker.query.QueryByCriteria;
021 import org.kuali.rice.core.api.config.CoreConfigHelper;
022 import org.kuali.rice.core.api.util.RiceUtilities;
023 import org.kuali.rice.ksb.messaging.PersistedMessageBO;
024 import org.kuali.rice.ksb.messaging.PersistedMessagePayload;
025 import org.kuali.rice.ksb.messaging.dao.MessageQueueDAO;
026 import org.kuali.rice.ksb.util.KSBConstants;
027 import org.springmodules.orm.ojb.support.PersistenceBrokerDaoSupport;
028
029 import javax.xml.namespace.QName;
030 import java.util.List;
031 import java.util.Map;
032
033
034 public class MessageQueueDAOOjbImpl extends PersistenceBrokerDaoSupport implements MessageQueueDAO {
035
036 private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger.getLogger(MessageQueueDAOOjbImpl.class);
037
038 public void remove(PersistedMessageBO routeQueue) {
039 if (LOG.isDebugEnabled()) {
040 LOG.debug("Removing message " + routeQueue);
041 }
042 Criteria crit = new Criteria();
043 crit.addEqualTo("routeQueueId", routeQueue.getRouteQueueId());
044 getPersistenceBrokerTemplate().deleteByQuery(new QueryByCriteria(PersistedMessageBO.class, crit));
045
046 crit = new Criteria();
047 crit.addEqualTo("routeQueueId", routeQueue.getPayload().getRouteQueueId());
048 getPersistenceBrokerTemplate().deleteByQuery(new QueryByCriteria(PersistedMessagePayload.class, crit));
049 }
050
051 public void save(PersistedMessageBO routeQueue) {
052 if (LOG.isDebugEnabled()) {
053 LOG.debug("Persisting message " + routeQueue);
054 }
055 getPersistenceBrokerTemplate().store(routeQueue);
056 routeQueue.getPayload().setRouteQueueId(routeQueue.getRouteQueueId());
057 getPersistenceBrokerTemplate().store(routeQueue.getPayload());
058 }
059
060 @SuppressWarnings("unchecked")
061 public List<PersistedMessageBO> findAll() {
062 if (LOG.isDebugEnabled()) {
063 LOG.debug("Returning all persisted messages");
064 }
065 return (List<PersistedMessageBO>) getPersistenceBrokerTemplate().getCollectionByQuery(
066 new QueryByCriteria(PersistedMessageBO.class));
067 }
068
069 @SuppressWarnings("unchecked")
070 public List<PersistedMessageBO> findAll(int maxRows) {
071 if (LOG.isDebugEnabled()) {
072 LOG.debug("Finding next " + maxRows + " messages");
073 }
074 QueryByCriteria query = new QueryByCriteria(PersistedMessageBO.class);
075 query.setStartAtIndex(0);
076 query.setEndAtIndex(maxRows);
077 return (List<PersistedMessageBO>) getPersistenceBrokerTemplate().getCollectionByQuery(query);
078 }
079
080 @SuppressWarnings("unchecked")
081 public List<PersistedMessageBO> findByValues(Map<String, String> criteriaValues, int maxRows) {
082 Criteria crit = new Criteria();
083 String value = null;
084 for (String key : criteriaValues.keySet()) {
085 value = criteriaValues.get(key);
086 if (StringUtils.isBlank(key) && StringUtils.isBlank(value)) {
087 throw new IllegalArgumentException("Either the key or value was blank in criteriaValues (" + key + "="
088 + value + ")");
089 }
090
091 // auto-wildcard the statement
092 if (!key.equals("routeQueueId")) {
093 if (value.contains("*")) {
094 value = value.replace("*", "%");
095 } else {
096 value = value.concat("%");
097 }
098 }
099 if (!StringUtils.containsOnly(value, "%")) {
100 crit.addLike(key, value);
101 }
102 }
103 QueryByCriteria query = new QueryByCriteria(PersistedMessageBO.class, crit);
104 query.setFetchSize(maxRows);
105 query.setStartAtIndex(0);
106 query.setEndAtIndex(maxRows);
107 return (List<PersistedMessageBO>) getPersistenceBrokerTemplate().getCollectionByQuery(query);
108 }
109
110 public PersistedMessageBO findByRouteQueueId(Long routeQueueId) {
111 Criteria criteria = new Criteria();
112 criteria.addEqualTo("routeQueueId", routeQueueId);
113 return (PersistedMessageBO) getPersistenceBrokerTemplate().getObjectByQuery(
114 new QueryByCriteria(PersistedMessageBO.class, criteria));
115 }
116
117 public PersistedMessagePayload findByPersistedMessageByRouteQueueId(Long routeQueueId) {
118 Criteria criteria = new Criteria();
119 criteria.addEqualTo("routeQueueId", routeQueueId);
120 return (PersistedMessagePayload) getPersistenceBrokerTemplate().getObjectByQuery(
121 new QueryByCriteria(PersistedMessagePayload.class, criteria));
122 }
123
124 @SuppressWarnings("unchecked")
125 public List<PersistedMessageBO> getNextDocuments(Integer maxDocuments) {
126 Criteria crit = new Criteria();
127 String applicationId = CoreConfigHelper.getApplicationId();
128 crit.addEqualTo("applicationId", applicationId);
129 crit.addNotEqualTo("queueStatus", KSBConstants.ROUTE_QUEUE_EXCEPTION);
130 crit.addEqualTo("ipNumber", RiceUtilities.getIpNumber());
131
132 QueryByCriteria query = new QueryByCriteria(PersistedMessageBO.class, crit);
133 query.addOrderByAscending("queuePriority");
134 query.addOrderByAscending("routeQueueId");
135 query.addOrderByAscending("queueDate");
136 if (maxDocuments != null)
137 query.setEndAtIndex(maxDocuments.intValue());
138 return (List) getPersistenceBrokerTemplate().getCollectionByQuery(query);
139 }
140
141 @SuppressWarnings("unchecked")
142 public List<PersistedMessageBO> findByServiceName(QName serviceName, String methodName) {
143 if (LOG.isDebugEnabled()) {
144 LOG.debug("Finding messages for service name " + serviceName);
145 }
146 Criteria crit = new Criteria();
147 crit.addEqualTo("serviceName", serviceName.toString());
148 crit.addEqualTo("methodName", methodName);
149 return (List<PersistedMessageBO>) getPersistenceBrokerTemplate().getCollectionByQuery(
150 new QueryByCriteria(PersistedMessageBO.class, crit));
151 }
152
153 }