1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.kcb.service.impl;
17
18 import org.apache.log4j.Logger;
19 import org.apache.ojb.broker.query.Criteria;
20 import org.kuali.rice.core.api.util.RiceConstants;
21 import org.kuali.rice.kcb.bo.Message;
22 import org.kuali.rice.kcb.bo.MessageDelivery;
23 import org.kuali.rice.kcb.bo.MessageDeliveryStatus;
24 import org.kuali.rice.kcb.service.MessageDeliveryService;
25
26 import java.sql.Timestamp;
27 import java.util.ArrayList;
28 import java.util.Collection;
29 import java.util.HashMap;
30 import java.util.Map;
31
32
33
34
35
36
37 public class MessageDeliveryServiceImpl extends BusinessObjectServiceImpl implements MessageDeliveryService {
38 private static final Logger LOG = Logger.getLogger(MessageDeliveryServiceImpl.class);
39
40
41
42
43
44 private int maxProcessAttempts;
45
46
47
48
49
50 public void setMaxProcessAttempts(int maxProcessAttempts) {
51 this.maxProcessAttempts = maxProcessAttempts;
52 }
53
54
55
56
57 public void saveMessageDelivery(MessageDelivery delivery) {
58 dao.save(delivery);
59 }
60
61
62
63
64 public void deleteMessageDelivery(MessageDelivery messageDelivery) {
65 dao.delete(messageDelivery);
66 }
67
68
69
70
71 public Collection<MessageDelivery> getAllMessageDeliveries() {
72 return dao.findAll(MessageDelivery.class);
73 }
74
75
76
77
78 public MessageDelivery getMessageDelivery(Long id) {
79 Map<String, Object> fields = new HashMap<String, Object>(1);
80 fields.put(MessageDelivery.ID_FIELD, id);
81 return (MessageDelivery) dao.findByPrimaryKey(MessageDelivery.class, fields);
82 }
83
84
85
86
87 public MessageDelivery getMessageDeliveryByDelivererSystemId(Long id) {
88 Criteria criteria = new Criteria();
89 criteria.addEqualTo(MessageDelivery.SYSTEMID_FIELD, id);
90 Collection<MessageDelivery> results = dao.findMatching(MessageDelivery.class, criteria);
91 if (results == null || results.size() == 0) return null;
92 if (results.size() > 1) {
93 throw new RuntimeException("More than one message delivery found with the following delivery system id: " + id);
94 }
95 return results.iterator().next();
96 }
97
98
99
100
101 public Collection<MessageDelivery> getMessageDeliveries(Message message) {
102 Criteria criteria = new Criteria();
103 criteria.addEqualTo(MessageDelivery.MESSAGEID_FIELD, message.getId());
104 return dao.findMatching(MessageDelivery.class, criteria);
105 }
106
107
108
109
110
111
112
113 public Collection<MessageDelivery> lockAndTakeMessageDeliveries(MessageDeliveryStatus[] statuses) {
114 return lockAndTakeMessageDeliveries(null, statuses);
115 }
116 public Collection<MessageDelivery> lockAndTakeMessageDeliveries(Long messageId, MessageDeliveryStatus[] statuses) {
117 LOG.debug("========>> ENTERING LockAndTakeMessageDeliveries: " + Thread.currentThread());
118
119
120
121
122 Criteria criteria = new Criteria();
123 criteria.addIsNull(MessageDelivery.LOCKED_DATE);
124 if (messageId != null) {
125 criteria.addEqualTo(MessageDelivery.MESSAGEID_FIELD, messageId);
126 }
127 criteria.addLessThan(MessageDelivery.PROCESS_COUNT, maxProcessAttempts);
128 Collection<String> statusCollection = new ArrayList<String>(statuses.length);
129 for (MessageDeliveryStatus status: statuses) {
130 statusCollection.add(status.name());
131 }
132 criteria.addIn(MessageDelivery.DELIVERY_STATUS, statusCollection);
133
134 Collection<MessageDelivery> messageDeliveries = dao.findMatching(MessageDelivery.class, criteria, true, RiceConstants.NO_WAIT);
135
136
137
138
139 for (MessageDelivery delivery: messageDeliveries) {
140 LOG.debug("Took: " + delivery);
141 delivery.setLockedDate(new Timestamp(System.currentTimeMillis()));
142 dao.save(delivery);
143 }
144
145 LOG.debug("<<======= LEAVING LockAndTakeMessageDeliveries: " + Thread.currentThread());
146 return messageDeliveries;
147 }
148 }