001    /**
002     * Copyright 2005-2011 The Kuali Foundation
003     *
004     * Licensed under the Educational Community License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.opensource.org/licenses/ecl2.php
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package org.kuali.rice.ksb.messaging.dao.impl;
017    
018    import org.apache.commons.lang.StringUtils;
019    import org.apache.ojb.broker.query.Criteria;
020    import org.apache.ojb.broker.query.QueryByCriteria;
021    import org.kuali.rice.core.api.config.CoreConfigHelper;
022    import org.kuali.rice.core.api.util.RiceUtilities;
023    import org.kuali.rice.ksb.messaging.PersistedMessageBO;
024    import org.kuali.rice.ksb.messaging.PersistedMessagePayload;
025    import org.kuali.rice.ksb.messaging.dao.MessageQueueDAO;
026    import org.kuali.rice.ksb.util.KSBConstants;
027    import org.springmodules.orm.ojb.support.PersistenceBrokerDaoSupport;
028    
029    import javax.xml.namespace.QName;
030    import java.util.List;
031    import java.util.Map;
032    
033    
034    public class MessageQueueDAOOjbImpl extends PersistenceBrokerDaoSupport implements MessageQueueDAO {
035    
036            private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger.getLogger(MessageQueueDAOOjbImpl.class);
037    
038            public void remove(PersistedMessageBO routeQueue) {
039                    if (LOG.isDebugEnabled()) {
040                            LOG.debug("Removing message " + routeQueue);
041                    }
042                    Criteria crit = new Criteria();
043                    crit.addEqualTo("routeQueueId", routeQueue.getRouteQueueId());
044                    getPersistenceBrokerTemplate().deleteByQuery(new QueryByCriteria(PersistedMessageBO.class, crit));
045    
046                    crit = new Criteria();
047                    crit.addEqualTo("routeQueueId", routeQueue.getPayload().getRouteQueueId());
048                    getPersistenceBrokerTemplate().deleteByQuery(new QueryByCriteria(PersistedMessagePayload.class, crit));
049            }
050    
051            public void save(PersistedMessageBO routeQueue) {
052                    if (LOG.isDebugEnabled()) {
053                            LOG.debug("Persisting message " + routeQueue);
054                    }
055                    getPersistenceBrokerTemplate().store(routeQueue);
056                    routeQueue.getPayload().setRouteQueueId(routeQueue.getRouteQueueId());
057                    getPersistenceBrokerTemplate().store(routeQueue.getPayload());
058            }
059    
060            @SuppressWarnings("unchecked")
061            public List<PersistedMessageBO> findAll() {
062                    if (LOG.isDebugEnabled()) {
063                            LOG.debug("Returning all persisted messages");
064                    }
065            return (List<PersistedMessageBO>) getPersistenceBrokerTemplate().getCollectionByQuery(
066                    new QueryByCriteria(PersistedMessageBO.class));
067            }
068    
069            @SuppressWarnings("unchecked")
070            public List<PersistedMessageBO> findAll(int maxRows) {
071                    if (LOG.isDebugEnabled()) {
072                            LOG.debug("Finding next " + maxRows + " messages");
073                    }
074                    QueryByCriteria query = new QueryByCriteria(PersistedMessageBO.class);
075                    query.setStartAtIndex(0);
076                    query.setEndAtIndex(maxRows);
077                    return (List<PersistedMessageBO>) getPersistenceBrokerTemplate().getCollectionByQuery(query);
078            }
079    
080            @SuppressWarnings("unchecked")
081        public List<PersistedMessageBO> findByValues(Map<String, String> criteriaValues, int maxRows) {
082                    Criteria crit = new Criteria();
083                    String value = null;
084                    for (String key : criteriaValues.keySet()) {
085                            value = criteriaValues.get(key);
086                            if (StringUtils.isBlank(key) && StringUtils.isBlank(value)) {
087                    throw new IllegalArgumentException("Either the key or value was blank in criteriaValues (" + key + "="
088                            + value + ")");
089                            }
090    
091                            // auto-wildcard the statement
092                if (!key.equals("routeQueueId")) {
093                            if (value.contains("*")) {
094                                    value = value.replace("*", "%");
095                            } else {
096                                    value = value.concat("%");
097                            }
098                }
099                    if (!StringUtils.containsOnly(value, "%")) {
100                            crit.addLike(key, value);
101                    }
102            }
103            QueryByCriteria query = new QueryByCriteria(PersistedMessageBO.class, crit);
104            query.setFetchSize(maxRows);
105            query.setStartAtIndex(0);
106            query.setEndAtIndex(maxRows);
107            return (List<PersistedMessageBO>) getPersistenceBrokerTemplate().getCollectionByQuery(query);
108        }
109    
110            public PersistedMessageBO findByRouteQueueId(Long routeQueueId) {
111                    Criteria criteria = new Criteria();
112                    criteria.addEqualTo("routeQueueId", routeQueueId);
113            return (PersistedMessageBO) getPersistenceBrokerTemplate().getObjectByQuery(
114                    new QueryByCriteria(PersistedMessageBO.class, criteria));
115            }
116    
117        public PersistedMessagePayload findByPersistedMessageByRouteQueueId(Long routeQueueId) {
118            Criteria criteria = new Criteria();
119            criteria.addEqualTo("routeQueueId", routeQueueId);
120            return (PersistedMessagePayload) getPersistenceBrokerTemplate().getObjectByQuery(
121                    new QueryByCriteria(PersistedMessagePayload.class, criteria));
122            }
123    
124            @SuppressWarnings("unchecked")
125            public List<PersistedMessageBO> getNextDocuments(Integer maxDocuments) {
126                    Criteria crit = new Criteria();
127                    String applicationId = CoreConfigHelper.getApplicationId();
128                    crit.addEqualTo("applicationId", applicationId);
129                    crit.addNotEqualTo("queueStatus", KSBConstants.ROUTE_QUEUE_EXCEPTION);
130                    crit.addEqualTo("ipNumber", RiceUtilities.getIpNumber());
131    
132                    QueryByCriteria query = new QueryByCriteria(PersistedMessageBO.class, crit);
133                    query.addOrderByAscending("queuePriority");
134                    query.addOrderByAscending("routeQueueId");
135                    query.addOrderByAscending("queueDate");
136                    if (maxDocuments != null)
137                            query.setEndAtIndex(maxDocuments.intValue());
138                    return (List) getPersistenceBrokerTemplate().getCollectionByQuery(query);
139            }
140    
141            @SuppressWarnings("unchecked")
142            public List<PersistedMessageBO> findByServiceName(QName serviceName, String methodName) {
143                    if (LOG.isDebugEnabled()) {
144                            LOG.debug("Finding messages for service name " + serviceName);
145                    }
146                    Criteria crit = new Criteria();
147                    crit.addEqualTo("serviceName", serviceName.toString());
148                    crit.addEqualTo("methodName", methodName);
149            return (List<PersistedMessageBO>) getPersistenceBrokerTemplate().getCollectionByQuery(
150                    new QueryByCriteria(PersistedMessageBO.class, crit));
151            }
152    
153    }