View Javadoc
1   /**
2    * Copyright 2005-2015 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.kcb.service.impl;
17  
18  import org.apache.log4j.Logger;
19  import org.kuali.rice.core.api.criteria.Predicate;
20  import org.kuali.rice.core.api.criteria.QueryByCriteria;
21  import org.kuali.rice.kcb.bo.Message;
22  import org.kuali.rice.kcb.bo.MessageDelivery;
23  import org.kuali.rice.kcb.bo.MessageDeliveryStatus;
24  import org.kuali.rice.kcb.service.MessageDeliveryService;
25  import org.kuali.rice.krad.data.DataObjectService;
26  
27  import java.sql.Timestamp;
28  import java.util.ArrayList;
29  import java.util.Collection;
30  import java.util.List;
31  
32  import static org.kuali.rice.core.api.criteria.PredicateFactory.*;
33  
34  /**
35   * MessageDeliveryService implementation 
36   * 
37   * @author Kuali Rice Team (rice.collab@kuali.org)
38   */
39  public class MessageDeliveryServiceImpl implements MessageDeliveryService {
40      private static final Logger LOG = Logger.getLogger(MessageDeliveryServiceImpl.class);
41  
42      private DataObjectService dataObjectService;
43  
44      /**
45       * Number of processing attempts to make.  {@link MessageDelivery}s with this number or more of attempts
46       * will not be selected for further processing.
47       */
48      private int maxProcessAttempts;
49  
50      /**
51       * Sets the max processing attempts
52       * @param maxProcessAttempts the max delivery attempts
53       */
54      public void setMaxProcessAttempts(int maxProcessAttempts) {
55          this.maxProcessAttempts = maxProcessAttempts;
56      }
57  
58      /**
59       * @see org.kuali.rice.kcb.service.MessageDeliveryService#saveMessageDelivery(org.kuali.rice.kcb.bo.MessageDelivery)
60       */
61      public MessageDelivery saveMessageDelivery(MessageDelivery delivery) {
62          return dataObjectService.save(delivery);
63      }
64  
65      /**
66       * @see org.kuali.rice.kcb.service.MessageDeliveryService#deleteMessageDelivery(MessageDelivery)
67       */
68      public void deleteMessageDelivery(MessageDelivery messageDelivery) {
69          dataObjectService.delete(messageDelivery);
70      }
71  
72      /**
73       * @see org.kuali.rice.kcb.service.MessageDeliveryService#getAllMessageDeliveries()
74       */
75      public Collection<MessageDelivery> getAllMessageDeliveries() {
76          return dataObjectService.findMatching(MessageDelivery.class, QueryByCriteria.Builder.create().build()).getResults();
77      }
78  
79      /**
80       * @see org.kuali.rice.kcb.service.MessageDeliveryService#getMessageDelivery(java.lang.Long)
81       */
82      public MessageDelivery getMessageDelivery(Long id) {
83          return dataObjectService.find(MessageDelivery.class, id);
84      }
85  
86      /**
87       * @see org.kuali.rice.kcb.service.MessageDeliveryService#getMessageDeliveryByDelivererSystemId(java.lang.Long)
88       */
89      public MessageDelivery getMessageDeliveryByDelivererSystemId(Long id) {
90          QueryByCriteria.Builder criteria = QueryByCriteria.Builder.create();
91  
92          criteria.setPredicates(equal(MessageDelivery.SYSTEMID_FIELD, id));
93          List<MessageDelivery> results = dataObjectService.findMatching(MessageDelivery.class, criteria.build()).getResults();
94  
95          if (results.isEmpty()) {
96              return null;
97          }
98          if (results.size() > 1) {
99              throw new RuntimeException("More than one message delivery found with the following delivery system id: " + id);
100         }
101         return results.get(0);
102     }
103 
104     /**
105      * @see org.kuali.rice.kcb.service.MessageDeliveryService#getMessageDeliveries(org.kuali.rice.kcb.bo.Message)
106      */
107     public Collection<MessageDelivery> getMessageDeliveries(Message message) {
108         QueryByCriteria.Builder criteria = QueryByCriteria.Builder.create();
109         criteria.setPredicates(equal(MessageDelivery.MESSAGEID_FIELD, message.getId()));
110 
111         return dataObjectService.findMatching(MessageDelivery.class, criteria.build()).getResults();
112     }
113 
114     /* This method is responsible for atomically finding messagedeliveries, marking them as taken
115      * and returning them to the caller for processing.
116      * NOTE: it is important that this method execute in a SEPARATE dedicated transaction; either the caller should
117      * NOT be wrapped by Spring declarative transaction and this service should be wrapped (which is the case), or
118      * the caller should arrange to invoke this from within a newly created transaction).
119      */
120     public Collection<MessageDelivery> lockAndTakeMessageDeliveries(MessageDeliveryStatus[] statuses) {
121         return lockAndTakeMessageDeliveries(null, statuses);
122     }
123     public Collection<MessageDelivery> lockAndTakeMessageDeliveries(Long messageId, MessageDeliveryStatus[] statuses) {
124         LOG.debug("========>> ENTERING LockAndTakeMessageDeliveries: " + Thread.currentThread());
125         // DO WITHIN TRANSACTION: get all untaken messagedeliveries, and mark as "taken" so no other thread/job takes them
126         // need to think about durability of work list
127 
128         QueryByCriteria.Builder criteria = QueryByCriteria.Builder.create();
129         List<Predicate> predicates = new ArrayList<Predicate>();
130 
131         predicates.add(isNull(MessageDelivery.LOCKED_DATE));
132         if (messageId != null) {
133             predicates.add(equal(MessageDelivery.MESSAGEID_FIELD + ".id", messageId));
134         }
135         predicates.add(lessThan(MessageDelivery.PROCESS_COUNT, maxProcessAttempts));
136 
137         Collection<String> statusCollection = new ArrayList<String>(statuses.length);
138         for (MessageDeliveryStatus status: statuses) {
139             statusCollection.add(status.name());
140         }
141         predicates.add(in(MessageDelivery.DELIVERY_STATUS, statusCollection));
142         criteria.setPredicates(predicates.toArray(new Predicate[predicates.size()]));
143         List<MessageDelivery> messageDeliveries = dataObjectService.findMatching(MessageDelivery.class, criteria.build()).getResults();
144         List<MessageDelivery> lockedMsgDels = new ArrayList<MessageDelivery>();
145 
146         // mark messageDeliveries as taken
147         for (MessageDelivery delivery: messageDeliveries) {
148             LOG.debug("Took: " + delivery);
149             delivery.setLockedDate(new Timestamp(System.currentTimeMillis()));
150             delivery = dataObjectService.save(delivery);
151             lockedMsgDels.add(delivery);
152         }
153 
154         LOG.debug("<<=======  LEAVING LockAndTakeMessageDeliveries: " + Thread.currentThread());
155         return lockedMsgDels;
156     }
157 
158     /**
159      * Sets the data object service.
160      * @param dataObjectService service to persist data to the datasource
161      */
162     public void setDataObjectService(DataObjectService dataObjectService) {
163         this.dataObjectService = dataObjectService;
164     }
165 }