001    /**
002     * Copyright 2005-2014 The Kuali Foundation
003     *
004     * Licensed under the Educational Community License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.opensource.org/licenses/ecl2.php
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package org.kuali.rice.kcb.service.impl;
017    
018    import org.apache.log4j.Logger;
019    import org.apache.ojb.broker.query.Criteria;
020    import org.kuali.rice.core.api.util.RiceConstants;
021    import org.kuali.rice.kcb.bo.Message;
022    import org.kuali.rice.kcb.bo.MessageDelivery;
023    import org.kuali.rice.kcb.bo.MessageDeliveryStatus;
024    import org.kuali.rice.kcb.service.MessageDeliveryService;
025    
026    import java.sql.Timestamp;
027    import java.util.ArrayList;
028    import java.util.Collection;
029    import java.util.HashMap;
030    import java.util.Map;
031    
032    /**
033     * MessageDeliveryService implementation 
034     * 
035     * @author Kuali Rice Team (rice.collab@kuali.org)
036     */
037    public class MessageDeliveryServiceImpl extends BusinessObjectServiceImpl implements MessageDeliveryService {
038        private static final Logger LOG = Logger.getLogger(MessageDeliveryServiceImpl.class);
039    
040        /**
041         * Number of processing attempts to make.  {@link MessageDelivery}s with this number or more of attempts
042         * will not be selected for further processing.
043         */
044        private int maxProcessAttempts;
045    
046        /**
047         * Sets the max processing attempts
048         * @param maxProcessAttempts the max delivery attempts
049         */
050        public void setMaxProcessAttempts(int maxProcessAttempts) {
051            this.maxProcessAttempts = maxProcessAttempts;
052        }
053    
054        /**
055         * @see org.kuali.rice.kcb.service.MessageDeliveryService#saveMessageDelivery(org.kuali.rice.kcb.bo.MessageDelivery)
056         */
057        public void saveMessageDelivery(MessageDelivery delivery) {
058            dao.save(delivery);
059        }
060    
061        /**
062         * @see org.kuali.rice.kcb.service.MessageDeliveryService#deleteMessageDelivery(java.lang.Long)
063         */
064        public void deleteMessageDelivery(MessageDelivery messageDelivery) {
065            dao.delete(messageDelivery);
066        }
067    
068        /**
069         * @see org.kuali.rice.kcb.service.MessageDeliveryService#getMessageDeliveries()
070         */
071        public Collection<MessageDelivery> getAllMessageDeliveries() {
072            return dao.findAll(MessageDelivery.class);
073        }
074    
075        /**
076         * @see org.kuali.rice.kcb.service.MessageDeliveryService#getMessageDelivery(java.lang.Long)
077         */
078        public MessageDelivery getMessageDelivery(Long id) {
079            Map<String, Object> fields = new HashMap<String, Object>(1);
080            fields.put(MessageDelivery.ID_FIELD, id);
081            return (MessageDelivery) dao.findByPrimaryKey(MessageDelivery.class, fields);
082        }
083    
084        /**
085         * @see org.kuali.rice.kcb.service.MessageDeliveryService#getMessageDeliveryByDelivererSystemId(java.lang.Long)
086         */
087        public MessageDelivery getMessageDeliveryByDelivererSystemId(Long id) {
088            Criteria criteria = new Criteria();
089            criteria.addEqualTo(MessageDelivery.SYSTEMID_FIELD, id);
090            Collection<MessageDelivery> results = dao.findMatching(MessageDelivery.class, criteria);
091            if (results == null || results.size() == 0) return null;
092            if (results.size() > 1) {
093                throw new RuntimeException("More than one message delivery found with the following delivery system id: " + id);
094            }
095            return results.iterator().next();
096        }
097    
098        /**
099         * @see org.kuali.rice.kcb.service.MessageDeliveryService#getMessageDeliveries(org.kuali.rice.kcb.bo.Message)
100         */
101        public Collection<MessageDelivery> getMessageDeliveries(Message message) {
102            Criteria criteria = new Criteria();
103            criteria.addEqualTo(MessageDelivery.MESSAGEID_FIELD, message.getId());
104            return dao.findMatching(MessageDelivery.class, criteria);
105        }
106    
107        /* This method is responsible for atomically finding messagedeliveries, marking them as taken
108         * and returning them to the caller for processing.
109         * NOTE: it is important that this method execute in a SEPARATE dedicated transaction; either the caller should
110         * NOT be wrapped by Spring declarative transaction and this service should be wrapped (which is the case), or
111         * the caller should arrange to invoke this from within a newly created transaction).
112         */
113        public Collection<MessageDelivery> lockAndTakeMessageDeliveries(MessageDeliveryStatus[] statuses) {
114            return lockAndTakeMessageDeliveries(null, statuses);
115        }
116        public Collection<MessageDelivery> lockAndTakeMessageDeliveries(Long messageId, MessageDeliveryStatus[] statuses) {
117            LOG.debug("========>> ENTERING LockAndTakeMessageDeliveries: " + Thread.currentThread());
118            // DO WITHIN TRANSACTION: get all untaken messagedeliveries, and mark as "taken" so no other thread/job takes them
119            // need to think about durability of work list
120    
121            // get all undelivered message deliveries
122            Criteria criteria = new Criteria();
123            criteria.addIsNull(MessageDelivery.LOCKED_DATE);
124            if (messageId != null) {
125                criteria.addEqualTo(MessageDelivery.MESSAGEID_FIELD, messageId);
126            }
127            criteria.addLessThan(MessageDelivery.PROCESS_COUNT, maxProcessAttempts);
128            Collection<String> statusCollection = new ArrayList<String>(statuses.length);
129            for (MessageDeliveryStatus status: statuses) {
130                statusCollection.add(status.name());
131            }
132            criteria.addIn(MessageDelivery.DELIVERY_STATUS, statusCollection);
133            // implement our select for update hack
134            Collection<MessageDelivery> messageDeliveries = dao.findMatching(MessageDelivery.class, criteria, true, RiceConstants.NO_WAIT);
135    
136            //LOG.debug("Retrieved " + messageDeliveries.size() + " available message deliveries: " + System.currentTimeMillis());
137    
138            // mark messageDeliveries as taken
139            for (MessageDelivery delivery: messageDeliveries) {
140                LOG.debug("Took: " + delivery);
141                delivery.setLockedDate(new Timestamp(System.currentTimeMillis()));
142                dao.save(delivery);
143            }
144    
145            LOG.debug("<<=======  LEAVING LockAndTakeMessageDeliveries: " + Thread.currentThread());
146            return messageDeliveries;
147        }
148    }