1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  package org.kuali.rice.kcb.service.impl;
17  
18  import org.apache.log4j.Logger;
19  import org.kuali.rice.core.api.criteria.Predicate;
20  import org.kuali.rice.core.api.criteria.QueryByCriteria;
21  import org.kuali.rice.kcb.bo.Message;
22  import org.kuali.rice.kcb.bo.MessageDelivery;
23  import org.kuali.rice.kcb.bo.MessageDeliveryStatus;
24  import org.kuali.rice.kcb.service.MessageDeliveryService;
25  import org.kuali.rice.krad.data.DataObjectService;
26  
27  import java.sql.Timestamp;
28  import java.util.ArrayList;
29  import java.util.Collection;
30  import java.util.List;
31  
32  import static org.kuali.rice.core.api.criteria.PredicateFactory.*;
33  
34  
35  
36  
37  
38  
39  public class MessageDeliveryServiceImpl implements MessageDeliveryService {
40      private static final Logger LOG = Logger.getLogger(MessageDeliveryServiceImpl.class);
41  
42      private DataObjectService dataObjectService;
43  
44      
45  
46  
47  
48      private int maxProcessAttempts;
49  
50      
51  
52  
53  
54      public void setMaxProcessAttempts(int maxProcessAttempts) {
55          this.maxProcessAttempts = maxProcessAttempts;
56      }
57  
58      
59  
60  
61      public MessageDelivery saveMessageDelivery(MessageDelivery delivery) {
62          return dataObjectService.save(delivery);
63      }
64  
65      
66  
67  
68      public void deleteMessageDelivery(MessageDelivery messageDelivery) {
69          dataObjectService.delete(messageDelivery);
70      }
71  
72      
73  
74  
75      public Collection<MessageDelivery> getAllMessageDeliveries() {
76          return dataObjectService.findMatching(MessageDelivery.class, QueryByCriteria.Builder.create().build()).getResults();
77      }
78  
79      
80  
81  
82      public MessageDelivery getMessageDelivery(Long id) {
83          return dataObjectService.find(MessageDelivery.class, id);
84      }
85  
86      
87  
88  
89      public MessageDelivery getMessageDeliveryByDelivererSystemId(Long id) {
90          QueryByCriteria.Builder criteria = QueryByCriteria.Builder.create();
91  
92          criteria.setPredicates(equal(MessageDelivery.SYSTEMID_FIELD, id));
93          List<MessageDelivery> results = dataObjectService.findMatching(MessageDelivery.class, criteria.build()).getResults();
94  
95          if (results.isEmpty()) {
96              return null;
97          }
98          if (results.size() > 1) {
99              throw new RuntimeException("More than one message delivery found with the following delivery system id: " + id);
100         }
101         return results.get(0);
102     }
103 
104     
105 
106 
107     public Collection<MessageDelivery> getMessageDeliveries(Message message) {
108         QueryByCriteria.Builder criteria = QueryByCriteria.Builder.create();
109         criteria.setPredicates(equal(MessageDelivery.MESSAGEID_FIELD, message.getId()));
110 
111         return dataObjectService.findMatching(MessageDelivery.class, criteria.build()).getResults();
112     }
113 
114     
115 
116 
117 
118 
119 
120     public Collection<MessageDelivery> lockAndTakeMessageDeliveries(MessageDeliveryStatus[] statuses) {
121         return lockAndTakeMessageDeliveries(null, statuses);
122     }
123     public Collection<MessageDelivery> lockAndTakeMessageDeliveries(Long messageId, MessageDeliveryStatus[] statuses) {
124         LOG.debug("========>> ENTERING LockAndTakeMessageDeliveries: " + Thread.currentThread());
125         
126         
127 
128         QueryByCriteria.Builder criteria = QueryByCriteria.Builder.create();
129         List<Predicate> predicates = new ArrayList<Predicate>();
130 
131         predicates.add(isNull(MessageDelivery.LOCKED_DATE));
132         if (messageId != null) {
133             predicates.add(equal(MessageDelivery.MESSAGEID_FIELD + ".id", messageId));
134         }
135         predicates.add(lessThan(MessageDelivery.PROCESS_COUNT, maxProcessAttempts));
136 
137         Collection<String> statusCollection = new ArrayList<String>(statuses.length);
138         for (MessageDeliveryStatus status: statuses) {
139             statusCollection.add(status.name());
140         }
141         predicates.add(in(MessageDelivery.DELIVERY_STATUS, statusCollection));
142         criteria.setPredicates(predicates.toArray(new Predicate[predicates.size()]));
143         List<MessageDelivery> messageDeliveries = dataObjectService.findMatching(MessageDelivery.class, criteria.build()).getResults();
144         List<MessageDelivery> lockedMsgDels = new ArrayList<MessageDelivery>();
145 
146         
147         for (MessageDelivery delivery: messageDeliveries) {
148             LOG.debug("Took: " + delivery);
149             delivery.setLockedDate(new Timestamp(System.currentTimeMillis()));
150             delivery = dataObjectService.save(delivery);
151             lockedMsgDels.add(delivery);
152         }
153 
154         LOG.debug("<<=======  LEAVING LockAndTakeMessageDeliveries: " + Thread.currentThread());
155         return lockedMsgDels;
156     }
157 
158     
159 
160 
161 
162     public void setDataObjectService(DataObjectService dataObjectService) {
163         this.dataObjectService = dataObjectService;
164     }
165 }