001 /** 002 * Copyright 2005-2011 The Kuali Foundation 003 * 004 * Licensed under the Educational Community License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl2.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 package org.kuali.rice.ksb.messaging.dao.impl; 017 018 import org.apache.commons.lang.StringUtils; 019 import org.apache.ojb.broker.query.Criteria; 020 import org.apache.ojb.broker.query.QueryByCriteria; 021 import org.kuali.rice.core.api.config.CoreConfigHelper; 022 import org.kuali.rice.core.api.util.RiceUtilities; 023 import org.kuali.rice.ksb.messaging.PersistedMessageBO; 024 import org.kuali.rice.ksb.messaging.PersistedMessagePayload; 025 import org.kuali.rice.ksb.messaging.dao.MessageQueueDAO; 026 import org.kuali.rice.ksb.util.KSBConstants; 027 import org.springmodules.orm.ojb.support.PersistenceBrokerDaoSupport; 028 029 import javax.xml.namespace.QName; 030 import java.util.List; 031 import java.util.Map; 032 033 034 public class MessageQueueDAOOjbImpl extends PersistenceBrokerDaoSupport implements MessageQueueDAO { 035 036 private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger.getLogger(MessageQueueDAOOjbImpl.class); 037 038 public void remove(PersistedMessageBO routeQueue) { 039 if (LOG.isDebugEnabled()) { 040 LOG.debug("Removing message " + routeQueue); 041 } 042 Criteria crit = new Criteria(); 043 crit.addEqualTo("routeQueueId", routeQueue.getRouteQueueId()); 044 getPersistenceBrokerTemplate().deleteByQuery(new QueryByCriteria(PersistedMessageBO.class, crit)); 045 046 crit = new Criteria(); 047 crit.addEqualTo("routeQueueId", routeQueue.getPayload().getRouteQueueId()); 048 getPersistenceBrokerTemplate().deleteByQuery(new QueryByCriteria(PersistedMessagePayload.class, crit)); 049 } 050 051 public void save(PersistedMessageBO routeQueue) { 052 if (LOG.isDebugEnabled()) { 053 LOG.debug("Persisting message " + routeQueue); 054 } 055 getPersistenceBrokerTemplate().store(routeQueue); 056 routeQueue.getPayload().setRouteQueueId(routeQueue.getRouteQueueId()); 057 getPersistenceBrokerTemplate().store(routeQueue.getPayload()); 058 } 059 060 @SuppressWarnings("unchecked") 061 public List<PersistedMessageBO> findAll() { 062 if (LOG.isDebugEnabled()) { 063 LOG.debug("Returning all persisted messages"); 064 } 065 return (List<PersistedMessageBO>) getPersistenceBrokerTemplate().getCollectionByQuery( 066 new QueryByCriteria(PersistedMessageBO.class)); 067 } 068 069 @SuppressWarnings("unchecked") 070 public List<PersistedMessageBO> findAll(int maxRows) { 071 if (LOG.isDebugEnabled()) { 072 LOG.debug("Finding next " + maxRows + " messages"); 073 } 074 QueryByCriteria query = new QueryByCriteria(PersistedMessageBO.class); 075 query.setStartAtIndex(0); 076 query.setEndAtIndex(maxRows); 077 return (List<PersistedMessageBO>) getPersistenceBrokerTemplate().getCollectionByQuery(query); 078 } 079 080 @SuppressWarnings("unchecked") 081 public List<PersistedMessageBO> findByValues(Map<String, String> criteriaValues, int maxRows) { 082 Criteria crit = new Criteria(); 083 String value = null; 084 for (String key : criteriaValues.keySet()) { 085 value = criteriaValues.get(key); 086 if (StringUtils.isBlank(key) && StringUtils.isBlank(value)) { 087 throw new IllegalArgumentException("Either the key or value was blank in criteriaValues (" + key + "=" 088 + value + ")"); 089 } 090 091 // auto-wildcard the statement 092 if (!key.equals("routeQueueId")) { 093 if (value.contains("*")) { 094 value = value.replace("*", "%"); 095 } else { 096 value = value.concat("%"); 097 } 098 } 099 if (!StringUtils.containsOnly(value, "%")) { 100 crit.addLike(key, value); 101 } 102 } 103 QueryByCriteria query = new QueryByCriteria(PersistedMessageBO.class, crit); 104 query.setFetchSize(maxRows); 105 query.setStartAtIndex(0); 106 query.setEndAtIndex(maxRows); 107 return (List<PersistedMessageBO>) getPersistenceBrokerTemplate().getCollectionByQuery(query); 108 } 109 110 public PersistedMessageBO findByRouteQueueId(Long routeQueueId) { 111 Criteria criteria = new Criteria(); 112 criteria.addEqualTo("routeQueueId", routeQueueId); 113 return (PersistedMessageBO) getPersistenceBrokerTemplate().getObjectByQuery( 114 new QueryByCriteria(PersistedMessageBO.class, criteria)); 115 } 116 117 public PersistedMessagePayload findByPersistedMessageByRouteQueueId(Long routeQueueId) { 118 Criteria criteria = new Criteria(); 119 criteria.addEqualTo("routeQueueId", routeQueueId); 120 return (PersistedMessagePayload) getPersistenceBrokerTemplate().getObjectByQuery( 121 new QueryByCriteria(PersistedMessagePayload.class, criteria)); 122 } 123 124 @SuppressWarnings("unchecked") 125 public List<PersistedMessageBO> getNextDocuments(Integer maxDocuments) { 126 Criteria crit = new Criteria(); 127 String applicationId = CoreConfigHelper.getApplicationId(); 128 crit.addEqualTo("applicationId", applicationId); 129 crit.addNotEqualTo("queueStatus", KSBConstants.ROUTE_QUEUE_EXCEPTION); 130 crit.addEqualTo("ipNumber", RiceUtilities.getIpNumber()); 131 132 QueryByCriteria query = new QueryByCriteria(PersistedMessageBO.class, crit); 133 query.addOrderByAscending("queuePriority"); 134 query.addOrderByAscending("routeQueueId"); 135 query.addOrderByAscending("queueDate"); 136 if (maxDocuments != null) 137 query.setEndAtIndex(maxDocuments.intValue()); 138 return (List) getPersistenceBrokerTemplate().getCollectionByQuery(query); 139 } 140 141 @SuppressWarnings("unchecked") 142 public List<PersistedMessageBO> findByServiceName(QName serviceName, String methodName) { 143 if (LOG.isDebugEnabled()) { 144 LOG.debug("Finding messages for service name " + serviceName); 145 } 146 Criteria crit = new Criteria(); 147 crit.addEqualTo("serviceName", serviceName.toString()); 148 crit.addEqualTo("methodName", methodName); 149 return (List<PersistedMessageBO>) getPersistenceBrokerTemplate().getCollectionByQuery( 150 new QueryByCriteria(PersistedMessageBO.class, crit)); 151 } 152 153 }