1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.kcb.quartz;
17
18 import org.apache.log4j.Logger;
19 import org.kuali.rice.core.framework.persistence.dao.GenericDao;
20 import org.kuali.rice.kcb.bo.Message;
21 import org.kuali.rice.kcb.bo.MessageDelivery;
22 import org.kuali.rice.kcb.bo.MessageDeliveryStatus;
23 import org.kuali.rice.kcb.deliverer.BulkMessageDeliverer;
24 import org.kuali.rice.kcb.deliverer.MessageDeliverer;
25 import org.kuali.rice.kcb.api.exception.MessageDeliveryProcessingException;
26 import org.kuali.rice.kcb.quartz.ProcessingResult.Failure;
27 import org.kuali.rice.kcb.service.GlobalKCBServiceLocator;
28 import org.kuali.rice.kcb.service.MessageDelivererRegistryService;
29 import org.kuali.rice.kcb.service.MessageDeliveryService;
30 import org.kuali.rice.kcb.service.MessageService;
31 import org.kuali.rice.krad.data.DataObjectService;
32 import org.kuali.rice.krad.service.KRADServiceLocator;
33 import org.quartz.JobExecutionContext;
34 import org.quartz.JobExecutionException;
35 import org.quartz.StatefulJob;
36 import org.springframework.beans.factory.annotation.Required;
37
38 import java.util.ArrayList;
39 import java.util.Collection;
40 import java.util.HashMap;
41 import java.util.HashSet;
42 import java.util.LinkedList;
43 import java.util.List;
44 import java.util.Map;
45 import java.util.Set;
46
47
48
49
50
51
52
53 public class MessageProcessingJob extends ConcurrentJob<MessageDelivery> implements StatefulJob {
54 public static final String NAME = "MessageProcessingJobDetail";
55 public static final String GROUP = "KCB-Delivery";
56
57 public static enum Mode {
58 DELIVER, REMOVE
59 }
60
61 private static final Logger LOG = Logger.getLogger(MessageProcessingJob.class);
62
63 private DataObjectService dataObjectService;
64 private MessageDelivererRegistryService registry;
65 private MessageDeliveryService messageDeliveryService;
66 private Long messageId;
67 private Mode mode = null;
68 private String user;
69 private String cause;
70
71 public MessageProcessingJob(Long messageId, Mode mode, String user, String cause) {
72 this();
73 this.messageId = messageId;
74 this.mode = mode;
75 this.user = user;
76 this.cause = cause;
77 }
78
79
80 public MessageProcessingJob() {
81 registry = GlobalKCBServiceLocator.getInstance().getMessageDelivererRegistryService();
82 messageDeliveryService = GlobalKCBServiceLocator.getInstance().getMessageDeliveryService();
83 txManager = GlobalKCBServiceLocator.getInstance().getTransactionManager();
84 dataObjectService = KRADServiceLocator.getDataObjectService();
85 }
86
87
88
89
90
91 public void setDataObjectService(DataObjectService dataObjectService) {
92 this.dataObjectService = dataObjectService;
93 }
94
95
96
97
98
99 @Required
100 public void setMessageDelivererRegistry(MessageDelivererRegistryService registry) {
101 this.registry = registry;
102 }
103
104
105
106
107
108 @Required
109 public void setMessageDeliveryService(MessageDeliveryService messageDeliveryService) {
110 this.messageDeliveryService = messageDeliveryService;
111 }
112
113 @Override
114 protected Collection<MessageDelivery> takeAvailableWorkItems() {
115 MessageDeliveryStatus[] statuses;
116 switch (mode) {
117 case DELIVER: {
118 statuses = new MessageDeliveryStatus[] { MessageDeliveryStatus.UNDELIVERED };
119 break;
120 }
121 case REMOVE: {
122 if (messageId == null) {
123 throw new IllegalStateException("Message id must be specified for message removal mode");
124 }
125 statuses = new MessageDeliveryStatus[] { MessageDeliveryStatus.DELIVERED, MessageDeliveryStatus.UNDELIVERED };
126 break;
127 }
128 default:
129 throw new RuntimeException("Invalid mode: " + mode);
130 }
131 for (MessageDeliveryStatus status: statuses) {
132 LOG.debug("Taking message deliveries with status: " + status);
133 }
134 Collection<MessageDelivery> ds = messageDeliveryService.lockAndTakeMessageDeliveries(messageId, statuses);
135 LOG.debug("Took " + ds.size() + " deliveries");
136 for (MessageDelivery md: ds) {
137 LOG.debug(md);
138 md.setProcessCount(md.getProcessCount().intValue() + 1);
139 }
140 return ds;
141 }
142
143 @Override
144 protected void unlockWorkItem(MessageDelivery item) {
145 item.setLockedDate(null);
146 dataObjectService.save(item);
147 }
148
149
150
151
152
153
154 @Override
155 protected Collection<Collection<MessageDelivery>> groupWorkItems(Collection<MessageDelivery> workItems, ProcessingResult<MessageDelivery> result) {
156 Collection<Collection<MessageDelivery>> groupedWorkItems = new ArrayList<Collection<MessageDelivery>>(workItems.size());
157
158 Map<String, Collection<MessageDelivery>> bulkWorkUnits = new HashMap<String, Collection<MessageDelivery>>();
159 for (MessageDelivery messageDelivery: workItems) {
160
161 MessageDeliverer deliverer = registry.getDeliverer(messageDelivery);
162 if (deliverer == null) {
163 LOG.error("Error obtaining message deliverer for message delivery: " + messageDelivery);
164 result.addFailure(new Failure<MessageDelivery>(messageDelivery, "Error obtaining message deliverer for message delivery"));
165 unlockWorkItemAtomically(messageDelivery);
166 continue;
167 }
168
169 if (deliverer instanceof BulkMessageDeliverer) {
170
171 String key = messageDelivery.getDelivererTypeName() + ":" + messageDelivery.getMessage().getId();
172 Collection<MessageDelivery> workUnit = bulkWorkUnits.get(key);
173 if (workUnit == null) {
174 workUnit = new LinkedList<MessageDelivery>();
175 bulkWorkUnits.put(key, workUnit);
176 }
177 workUnit.add(messageDelivery);
178 } else {
179 ArrayList<MessageDelivery> l = new ArrayList<MessageDelivery>(1);
180 l.add(messageDelivery);
181 groupedWorkItems.add(l);
182 }
183 }
184
185 return groupedWorkItems;
186 }
187
188
189 @Override
190 protected Collection<MessageDelivery> processWorkItems(Collection<MessageDelivery> messageDeliveries) {
191 MessageDelivery firstMessageDelivery = messageDeliveries.iterator().next();
192
193 MessageDeliverer messageDeliverer = registry.getDeliverer(firstMessageDelivery);
194 if (messageDeliverer == null) {
195 throw new RuntimeException("Message deliverer could not be obtained");
196 }
197
198 if (messageDeliveries.size() > 1) {
199
200 if (!(messageDeliverer instanceof BulkMessageDeliverer)) {
201 throw new RuntimeException("Discrepency in dispatch service: deliverer for list of message deliveries is not a BulkMessageDeliverer");
202 }
203 return bulkProcess((BulkMessageDeliverer) messageDeliverer, messageDeliveries, mode);
204 } else {
205 return process(messageDeliverer, firstMessageDelivery, mode);
206 }
207 }
208
209
210
211
212
213
214
215 protected Collection<MessageDelivery> process(MessageDeliverer messageDeliverer, MessageDelivery messageDelivery, Mode mode) {
216
217 try {
218 if (mode == Mode.DELIVER) {
219 messageDeliverer.deliver(messageDelivery);
220
221 messageDelivery.setProcessCount(Integer.valueOf(0));
222
223
224
225 updateStatusAndUnlock(messageDelivery, mode == Mode.DELIVER ? MessageDeliveryStatus.DELIVERED : MessageDeliveryStatus.REMOVED);
226 } else {
227 messageDeliverer.dismiss(messageDelivery, user, cause);
228
229 messageDeliveryService.deleteMessageDelivery(messageDelivery);
230 }
231 } catch (MessageDeliveryProcessingException nmde) {
232 LOG.error("Error processing message delivery " + messageDelivery, nmde);
233 throw new RuntimeException(nmde);
234 }
235
236 LOG.debug("Message delivery '" + messageDelivery.getId() + "' for message '" + messageDelivery.getMessage().getId() + "' was successfully processed.");
237
238
239 List<MessageDelivery> success = new ArrayList<MessageDelivery>(1);
240 success.add(messageDelivery);
241 return success;
242 }
243
244
245
246
247
248
249
250 protected Collection<MessageDelivery> bulkProcess(BulkMessageDeliverer messageDeliverer, Collection<MessageDelivery> messageDeliveries, Mode mode) {
251 MessageDeliveryStatus targetStatus = (mode == Mode.DELIVER ? MessageDeliveryStatus.DELIVERED : MessageDeliveryStatus.REMOVED);
252
253 try {
254 if (mode == Mode.DELIVER) {
255 messageDeliverer.bulkDeliver(messageDeliveries);
256 } else {
257 messageDeliverer.bulkDismiss(messageDeliveries);
258 }
259 } catch (MessageDeliveryProcessingException nmde) {
260 LOG.error("Error bulk-delivering messages " + messageDeliveries, nmde);
261 throw new RuntimeException(nmde);
262 }
263
264
265
266
267 List<MessageDelivery> successes = new ArrayList<MessageDelivery>(messageDeliveries.size());
268 for (MessageDelivery nmd: messageDeliveries) {
269 successes.add(nmd);
270 LOG.debug("Message delivery '" + nmd.getId() + "' for notification '" + nmd.getMessage().getId() + "' was successfully delivered.");
271
272 if (mode == Mode.REMOVE) {
273 messageDeliveryService.deleteMessageDelivery(nmd);
274 } else {
275 nmd.setProcessCount(0);
276 updateStatusAndUnlock(nmd, targetStatus);
277 }
278 }
279
280 return successes;
281 }
282
283 @Override
284 protected void finishProcessing(ProcessingResult<MessageDelivery> result) {
285 LOG.debug("Message processing job: " + result.getSuccesses().size() + " processed, " + result.getFailures().size() + " failures");
286 Set<Long> messageIds = new HashSet<Long>(result.getSuccesses().size());
287 for (MessageDelivery md: result.getSuccesses()) {
288 messageIds.add(md.getMessage().getId());
289 }
290 MessageService ms = GlobalKCBServiceLocator.getInstance().getMessageService();
291 for (Long id: messageIds) {
292 LOG.debug("Finishing processing message " + id);
293
294
295 Message m = ms.getMessage(id);
296
297 Collection<MessageDelivery> c = messageDeliveryService.getMessageDeliveries(m);
298 if (c.size() == 0) {
299 LOG.debug("Deleting message " + m);
300 ms.deleteMessage(m);
301 } else {
302 LOG.debug("Message " + m.getId() + " has " + c.size() + " deliveries");
303 for (MessageDelivery md: c) {
304 LOG.debug(md);
305 }
306 }
307 }
308 }
309
310
311
312
313
314 protected void updateStatusAndUnlock(MessageDelivery messageDelivery, MessageDeliveryStatus status) {
315 messageDelivery.setDeliveryStatus(status);
316
317 messageDelivery.setLockedDate(null);
318 dataObjectService.save(messageDelivery);
319 }
320
321 @Override
322 public ProcessingResult<MessageDelivery> run() {
323 LOG.debug("MessageProcessingJob running in Thread " + Thread.currentThread() + ": " + mode + " " + user + " " + cause);
324 return super.run();
325 }
326
327 public void execute(JobExecutionContext context) throws JobExecutionException {
328 String mode = context.getMergedJobDataMap().getString("mode");
329 if (mode != null) {
330 this.mode = Mode.valueOf(mode);
331 } else {
332 this.mode = Mode.DELIVER;
333 }
334 this.user = context.getMergedJobDataMap().getString("user");
335 this.cause = context.getMergedJobDataMap().getString("cause");
336 if (context.getMergedJobDataMap().containsKey("messageId")) {
337 this.messageId = context.getMergedJobDataMap().getLong("messageId");
338 }
339 LOG.debug("==== message processing job: " + this.mode + " message id: " + this.messageId + "====");
340 super.run();
341 }
342 }