1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.kcb.service.impl;
17
18 import org.apache.log4j.Logger;
19 import org.kuali.rice.core.api.criteria.Predicate;
20 import org.kuali.rice.core.api.criteria.QueryByCriteria;
21 import org.kuali.rice.kcb.bo.Message;
22 import org.kuali.rice.kcb.bo.MessageDelivery;
23 import org.kuali.rice.kcb.bo.MessageDeliveryStatus;
24 import org.kuali.rice.kcb.service.MessageDeliveryService;
25 import org.kuali.rice.krad.data.DataObjectService;
26
27 import java.sql.Timestamp;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.List;
31
32 import static org.kuali.rice.core.api.criteria.PredicateFactory.*;
33
34
35
36
37
38
39 public class MessageDeliveryServiceImpl implements MessageDeliveryService {
40 private static final Logger LOG = Logger.getLogger(MessageDeliveryServiceImpl.class);
41
42 private DataObjectService dataObjectService;
43
44
45
46
47
48 private int maxProcessAttempts;
49
50
51
52
53
54 public void setMaxProcessAttempts(int maxProcessAttempts) {
55 this.maxProcessAttempts = maxProcessAttempts;
56 }
57
58
59
60
61 public MessageDelivery saveMessageDelivery(MessageDelivery delivery) {
62 return dataObjectService.save(delivery);
63 }
64
65
66
67
68 public void deleteMessageDelivery(MessageDelivery messageDelivery) {
69 dataObjectService.delete(messageDelivery);
70 }
71
72
73
74
75 public Collection<MessageDelivery> getAllMessageDeliveries() {
76 return dataObjectService.findMatching(MessageDelivery.class, QueryByCriteria.Builder.create().build()).getResults();
77 }
78
79
80
81
82 public MessageDelivery getMessageDelivery(Long id) {
83 return dataObjectService.find(MessageDelivery.class, id);
84 }
85
86
87
88
89 public MessageDelivery getMessageDeliveryByDelivererSystemId(Long id) {
90 QueryByCriteria.Builder criteria = QueryByCriteria.Builder.create();
91
92 criteria.setPredicates(equal(MessageDelivery.SYSTEMID_FIELD, id));
93 List<MessageDelivery> results = dataObjectService.findMatching(MessageDelivery.class, criteria.build()).getResults();
94
95 if (results.isEmpty()) {
96 return null;
97 }
98 if (results.size() > 1) {
99 throw new RuntimeException("More than one message delivery found with the following delivery system id: " + id);
100 }
101 return results.get(0);
102 }
103
104
105
106
107 public Collection<MessageDelivery> getMessageDeliveries(Message message) {
108 QueryByCriteria.Builder criteria = QueryByCriteria.Builder.create();
109 criteria.setPredicates(equal(MessageDelivery.MESSAGEID_FIELD, message.getId()));
110
111 return dataObjectService.findMatching(MessageDelivery.class, criteria.build()).getResults();
112 }
113
114
115
116
117
118
119
120 public Collection<MessageDelivery> lockAndTakeMessageDeliveries(MessageDeliveryStatus[] statuses) {
121 return lockAndTakeMessageDeliveries(null, statuses);
122 }
123 public Collection<MessageDelivery> lockAndTakeMessageDeliveries(Long messageId, MessageDeliveryStatus[] statuses) {
124 LOG.debug("========>> ENTERING LockAndTakeMessageDeliveries: " + Thread.currentThread());
125
126
127
128 QueryByCriteria.Builder criteria = QueryByCriteria.Builder.create();
129 List<Predicate> predicates = new ArrayList<Predicate>();
130
131 predicates.add(isNull(MessageDelivery.LOCKED_DATE));
132 if (messageId != null) {
133 predicates.add(equal(MessageDelivery.MESSAGEID_FIELD + ".id", messageId));
134 }
135 predicates.add(lessThan(MessageDelivery.PROCESS_COUNT, maxProcessAttempts));
136
137 Collection<String> statusCollection = new ArrayList<String>(statuses.length);
138 for (MessageDeliveryStatus status: statuses) {
139 statusCollection.add(status.name());
140 }
141 predicates.add(in(MessageDelivery.DELIVERY_STATUS, statusCollection));
142 criteria.setPredicates(predicates.toArray(new Predicate[predicates.size()]));
143 List<MessageDelivery> messageDeliveries = dataObjectService.findMatching(MessageDelivery.class, criteria.build()).getResults();
144 List<MessageDelivery> lockedMsgDels = new ArrayList<MessageDelivery>();
145
146
147 for (MessageDelivery delivery: messageDeliveries) {
148 LOG.debug("Took: " + delivery);
149 delivery.setLockedDate(new Timestamp(System.currentTimeMillis()));
150 delivery = dataObjectService.save(delivery);
151 lockedMsgDels.add(delivery);
152 }
153
154 LOG.debug("<<======= LEAVING LockAndTakeMessageDeliveries: " + Thread.currentThread());
155 return lockedMsgDels;
156 }
157
158
159
160
161
162 public void setDataObjectService(DataObjectService dataObjectService) {
163 this.dataObjectService = dataObjectService;
164 }
165 }