Coverage Report - org.kuali.rice.kcb.service.impl.MessageDeliveryServiceImpl
 
Classes in this File Line Coverage Branch Coverage Complexity
MessageDeliveryServiceImpl
0%
0/40
0%
0/12
1.889
 
 1  
 /*
 2  
  * Copyright 2007-2008 The Kuali Foundation
 3  
  *
 4  
  * Licensed under the Educational Community License, Version 2.0 (the "License");
 5  
  * you may not use this file except in compliance with the License.
 6  
  * You may obtain a copy of the License at
 7  
  *
 8  
  * http://www.opensource.org/licenses/ecl2.php
 9  
  *
 10  
  * Unless required by applicable law or agreed to in writing, software
 11  
  * distributed under the License is distributed on an "AS IS" BASIS,
 12  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  
  * See the License for the specific language governing permissions and
 14  
  * limitations under the License.
 15  
  */
 16  
 package org.kuali.rice.kcb.service.impl;
 17  
 
 18  
 import java.sql.Timestamp;
 19  
 import java.util.ArrayList;
 20  
 import java.util.Collection;
 21  
 import java.util.HashMap;
 22  
 import java.util.Map;
 23  
 
 24  
 import org.apache.log4j.Logger;
 25  
 import org.apache.ojb.broker.query.Criteria;
 26  
 import org.kuali.rice.core.util.RiceConstants;
 27  
 import org.kuali.rice.kcb.bo.Message;
 28  
 import org.kuali.rice.kcb.bo.MessageDelivery;
 29  
 import org.kuali.rice.kcb.bo.MessageDeliveryStatus;
 30  
 import org.kuali.rice.kcb.service.MessageDeliveryService;
 31  
 
 32  
 /**
 33  
  * MessageDeliveryService implementation 
 34  
  * 
 35  
  * @author Kuali Rice Team (rice.collab@kuali.org)
 36  
  */
 37  0
 public class MessageDeliveryServiceImpl extends BusinessObjectServiceImpl implements MessageDeliveryService {
 38  0
     private static final Logger LOG = Logger.getLogger(MessageDeliveryServiceImpl.class);
 39  
 
 40  
     /**
 41  
      * Number of processing attempts to make.  {@link MessageDelivery}s with this number or more of attempts
 42  
      * will not be selected for further processing.
 43  
      */
 44  
     private int maxProcessAttempts;
 45  
 
 46  
     /**
 47  
      * Sets the max processing attempts
 48  
      * @param maxProcessAttempts the max delivery attempts
 49  
      */
 50  
     public void setMaxProcessAttempts(int maxProcessAttempts) {
 51  0
         this.maxProcessAttempts = maxProcessAttempts;
 52  0
     }
 53  
 
 54  
     /**
 55  
      * @see org.kuali.rice.kcb.service.MessageDeliveryService#saveMessageDelivery(org.kuali.rice.kcb.bo.MessageDelivery)
 56  
      */
 57  
     public void saveMessageDelivery(MessageDelivery delivery) {
 58  0
         dao.save(delivery);
 59  0
     }
 60  
 
 61  
     /**
 62  
      * @see org.kuali.rice.kcb.service.MessageDeliveryService#deleteMessageDelivery(java.lang.Long)
 63  
      */
 64  
     public void deleteMessageDelivery(MessageDelivery messageDelivery) {
 65  0
         dao.delete(messageDelivery);
 66  0
     }
 67  
 
 68  
     /**
 69  
      * @see org.kuali.rice.kcb.service.MessageDeliveryService#getMessageDeliveries()
 70  
      */
 71  
     public Collection<MessageDelivery> getAllMessageDeliveries() {
 72  0
         return dao.findAll(MessageDelivery.class);
 73  
     }
 74  
 
 75  
     /**
 76  
      * @see org.kuali.rice.kcb.service.MessageDeliveryService#getMessageDelivery(java.lang.Long)
 77  
      */
 78  
     public MessageDelivery getMessageDelivery(Long id) {
 79  0
         Map<String, Object> fields = new HashMap<String, Object>(1);
 80  0
         fields.put(MessageDelivery.ID_FIELD, id);
 81  0
         return (MessageDelivery) dao.findByPrimaryKey(MessageDelivery.class, fields);
 82  
     }
 83  
 
 84  
     /**
 85  
      * @see org.kuali.rice.kcb.service.MessageDeliveryService#getMessageDeliveryByDelivererSystemId(java.lang.Long)
 86  
      */
 87  
     public MessageDelivery getMessageDeliveryByDelivererSystemId(Long id) {
 88  0
         Criteria criteria = new Criteria();
 89  0
         criteria.addEqualTo(MessageDelivery.SYSTEMID_FIELD, id);
 90  0
         Collection<MessageDelivery> results = dao.findMatching(MessageDelivery.class, criteria);
 91  0
         if (results == null || results.size() == 0) return null;
 92  0
         if (results.size() > 1) {
 93  0
             throw new RuntimeException("More than one message delivery found with the following delivery system id: " + id);
 94  
         }
 95  0
         return results.iterator().next();
 96  
     }
 97  
 
 98  
     /**
 99  
      * @see org.kuali.rice.kcb.service.MessageDeliveryService#getMessageDeliveries(org.kuali.rice.kcb.bo.Message)
 100  
      */
 101  
     public Collection<MessageDelivery> getMessageDeliveries(Message message) {
 102  0
         Criteria criteria = new Criteria();
 103  0
         criteria.addEqualTo(MessageDelivery.MESSAGEID_FIELD, message.getId());
 104  0
         return dao.findMatching(MessageDelivery.class, criteria);
 105  
     }
 106  
 
 107  
     /* This method is responsible for atomically finding messagedeliveries, marking them as taken
 108  
      * and returning them to the caller for processing.
 109  
      * NOTE: it is important that this method execute in a SEPARATE dedicated transaction; either the caller should
 110  
      * NOT be wrapped by Spring declarative transaction and this service should be wrapped (which is the case), or
 111  
      * the caller should arrange to invoke this from within a newly created transaction).
 112  
      */
 113  
     public Collection<MessageDelivery> lockAndTakeMessageDeliveries(MessageDeliveryStatus[] statuses) {
 114  0
         return lockAndTakeMessageDeliveries(null, statuses);
 115  
     }
 116  
     public Collection<MessageDelivery> lockAndTakeMessageDeliveries(Long messageId, MessageDeliveryStatus[] statuses) {
 117  0
         LOG.debug("========>> ENTERING LockAndTakeMessageDeliveries: " + Thread.currentThread());
 118  
         // DO WITHIN TRANSACTION: get all untaken messagedeliveries, and mark as "taken" so no other thread/job takes them
 119  
         // need to think about durability of work list
 120  
 
 121  
         // get all undelivered message deliveries
 122  0
         Criteria criteria = new Criteria();
 123  0
         criteria.addIsNull(MessageDelivery.LOCKED_DATE);
 124  0
         if (messageId != null) {
 125  0
             criteria.addEqualTo(MessageDelivery.MESSAGEID_FIELD, messageId);
 126  
         }
 127  0
         criteria.addLessThan(MessageDelivery.PROCESS_COUNT, maxProcessAttempts);
 128  0
         Collection<String> statusCollection = new ArrayList<String>(statuses.length);
 129  0
         for (MessageDeliveryStatus status: statuses) {
 130  0
             statusCollection.add(status.name());
 131  
         }
 132  0
         criteria.addIn(MessageDelivery.DELIVERY_STATUS, statusCollection);
 133  
         // implement our select for update hack
 134  0
         Collection<MessageDelivery> messageDeliveries = dao.findMatching(MessageDelivery.class, criteria, true, RiceConstants.NO_WAIT);
 135  
 
 136  
         //LOG.debug("Retrieved " + messageDeliveries.size() + " available message deliveries: " + System.currentTimeMillis());
 137  
 
 138  
         // mark messageDeliveries as taken
 139  0
         for (MessageDelivery delivery: messageDeliveries) {
 140  0
             LOG.debug("Took: " + delivery);
 141  0
             delivery.setLockedDate(new Timestamp(System.currentTimeMillis()));
 142  0
             dao.save(delivery);
 143  
         }
 144  
 
 145  0
         LOG.debug("<<=======  LEAVING LockAndTakeMessageDeliveries: " + Thread.currentThread());
 146  0
         return messageDeliveries;
 147  
     }
 148  
 }