View Javadoc

1   /**
2    * Copyright 2005-2012 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.kcb.service.impl;
17  
18  import org.apache.log4j.Logger;
19  import org.apache.ojb.broker.query.Criteria;
20  import org.kuali.rice.core.api.util.RiceConstants;
21  import org.kuali.rice.kcb.bo.Message;
22  import org.kuali.rice.kcb.bo.MessageDelivery;
23  import org.kuali.rice.kcb.bo.MessageDeliveryStatus;
24  import org.kuali.rice.kcb.service.MessageDeliveryService;
25  
26  import java.sql.Timestamp;
27  import java.util.ArrayList;
28  import java.util.Collection;
29  import java.util.HashMap;
30  import java.util.Map;
31  
32  /**
33   * MessageDeliveryService implementation 
34   * 
35   * @author Kuali Rice Team (rice.collab@kuali.org)
36   */
37  public class MessageDeliveryServiceImpl extends BusinessObjectServiceImpl implements MessageDeliveryService {
38      private static final Logger LOG = Logger.getLogger(MessageDeliveryServiceImpl.class);
39  
40      /**
41       * Number of processing attempts to make.  {@link MessageDelivery}s with this number or more of attempts
42       * will not be selected for further processing.
43       */
44      private int maxProcessAttempts;
45  
46      /**
47       * Sets the max processing attempts
48       * @param maxProcessAttempts the max delivery attempts
49       */
50      public void setMaxProcessAttempts(int maxProcessAttempts) {
51          this.maxProcessAttempts = maxProcessAttempts;
52      }
53  
54      /**
55       * @see org.kuali.rice.kcb.service.MessageDeliveryService#saveMessageDelivery(org.kuali.rice.kcb.bo.MessageDelivery)
56       */
57      public void saveMessageDelivery(MessageDelivery delivery) {
58          dao.save(delivery);
59      }
60  
61      /**
62       * @see org.kuali.rice.kcb.service.MessageDeliveryService#deleteMessageDelivery(java.lang.Long)
63       */
64      public void deleteMessageDelivery(MessageDelivery messageDelivery) {
65          dao.delete(messageDelivery);
66      }
67  
68      /**
69       * @see org.kuali.rice.kcb.service.MessageDeliveryService#getMessageDeliveries()
70       */
71      public Collection<MessageDelivery> getAllMessageDeliveries() {
72          return dao.findAll(MessageDelivery.class);
73      }
74  
75      /**
76       * @see org.kuali.rice.kcb.service.MessageDeliveryService#getMessageDelivery(java.lang.Long)
77       */
78      public MessageDelivery getMessageDelivery(Long id) {
79          Map<String, Object> fields = new HashMap<String, Object>(1);
80          fields.put(MessageDelivery.ID_FIELD, id);
81          return (MessageDelivery) dao.findByPrimaryKey(MessageDelivery.class, fields);
82      }
83  
84      /**
85       * @see org.kuali.rice.kcb.service.MessageDeliveryService#getMessageDeliveryByDelivererSystemId(java.lang.Long)
86       */
87      public MessageDelivery getMessageDeliveryByDelivererSystemId(Long id) {
88          Criteria criteria = new Criteria();
89          criteria.addEqualTo(MessageDelivery.SYSTEMID_FIELD, id);
90          Collection<MessageDelivery> results = dao.findMatching(MessageDelivery.class, criteria);
91          if (results == null || results.size() == 0) return null;
92          if (results.size() > 1) {
93              throw new RuntimeException("More than one message delivery found with the following delivery system id: " + id);
94          }
95          return results.iterator().next();
96      }
97  
98      /**
99       * @see org.kuali.rice.kcb.service.MessageDeliveryService#getMessageDeliveries(org.kuali.rice.kcb.bo.Message)
100      */
101     public Collection<MessageDelivery> getMessageDeliveries(Message message) {
102         Criteria criteria = new Criteria();
103         criteria.addEqualTo(MessageDelivery.MESSAGEID_FIELD, message.getId());
104         return dao.findMatching(MessageDelivery.class, criteria);
105     }
106 
107     /* This method is responsible for atomically finding messagedeliveries, marking them as taken
108      * and returning them to the caller for processing.
109      * NOTE: it is important that this method execute in a SEPARATE dedicated transaction; either the caller should
110      * NOT be wrapped by Spring declarative transaction and this service should be wrapped (which is the case), or
111      * the caller should arrange to invoke this from within a newly created transaction).
112      */
113     public Collection<MessageDelivery> lockAndTakeMessageDeliveries(MessageDeliveryStatus[] statuses) {
114         return lockAndTakeMessageDeliveries(null, statuses);
115     }
116     public Collection<MessageDelivery> lockAndTakeMessageDeliveries(Long messageId, MessageDeliveryStatus[] statuses) {
117         LOG.debug("========>> ENTERING LockAndTakeMessageDeliveries: " + Thread.currentThread());
118         // DO WITHIN TRANSACTION: get all untaken messagedeliveries, and mark as "taken" so no other thread/job takes them
119         // need to think about durability of work list
120 
121         // get all undelivered message deliveries
122         Criteria criteria = new Criteria();
123         criteria.addIsNull(MessageDelivery.LOCKED_DATE);
124         if (messageId != null) {
125             criteria.addEqualTo(MessageDelivery.MESSAGEID_FIELD, messageId);
126         }
127         criteria.addLessThan(MessageDelivery.PROCESS_COUNT, maxProcessAttempts);
128         Collection<String> statusCollection = new ArrayList<String>(statuses.length);
129         for (MessageDeliveryStatus status: statuses) {
130             statusCollection.add(status.name());
131         }
132         criteria.addIn(MessageDelivery.DELIVERY_STATUS, statusCollection);
133         // implement our select for update hack
134         Collection<MessageDelivery> messageDeliveries = dao.findMatching(MessageDelivery.class, criteria, true, RiceConstants.NO_WAIT);
135 
136         //LOG.debug("Retrieved " + messageDeliveries.size() + " available message deliveries: " + System.currentTimeMillis());
137 
138         // mark messageDeliveries as taken
139         for (MessageDelivery delivery: messageDeliveries) {
140             LOG.debug("Took: " + delivery);
141             delivery.setLockedDate(new Timestamp(System.currentTimeMillis()));
142             dao.save(delivery);
143         }
144 
145         LOG.debug("<<=======  LEAVING LockAndTakeMessageDeliveries: " + Thread.currentThread());
146         return messageDeliveries;
147     }
148 }