1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.kcb.quartz;
17
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.HashMap;
21 import java.util.HashSet;
22 import java.util.LinkedList;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26
27 import org.apache.log4j.Logger;
28 import org.kuali.rice.core.dao.GenericDao;
29 import org.kuali.rice.kcb.bo.Message;
30 import org.kuali.rice.kcb.bo.MessageDelivery;
31 import org.kuali.rice.kcb.bo.MessageDeliveryStatus;
32 import org.kuali.rice.kcb.deliverer.BulkMessageDeliverer;
33 import org.kuali.rice.kcb.deliverer.MessageDeliverer;
34 import org.kuali.rice.kcb.exception.MessageDeliveryProcessingException;
35 import org.kuali.rice.kcb.quartz.ProcessingResult.Failure;
36 import org.kuali.rice.kcb.service.GlobalKCBServiceLocator;
37 import org.kuali.rice.kcb.service.MessageDelivererRegistryService;
38 import org.kuali.rice.kcb.service.MessageDeliveryService;
39 import org.kuali.rice.kcb.service.MessageService;
40 import org.quartz.JobExecutionContext;
41 import org.quartz.JobExecutionException;
42 import org.quartz.StatefulJob;
43 import org.springframework.beans.factory.annotation.Required;
44
45
46
47
48
49
50
51 public class MessageProcessingJob extends ConcurrentJob<MessageDelivery> implements StatefulJob {
52 public static final String NAME = "MessageProcessingJobDetail";
53 public static final String GROUP = "KCB-Delivery";
54
55 public static enum Mode {
56 DELIVER, REMOVE
57 }
58
59 private static final Logger LOG = Logger.getLogger(MessageProcessingJob.class);
60
61 private GenericDao dao;
62 private MessageDelivererRegistryService registry;
63 private MessageDeliveryService messageDeliveryService;
64 private Long messageId;
65 private Mode mode = null;
66 private String user;
67 private String cause;
68
69 public MessageProcessingJob(Long messageId, Mode mode, String user, String cause) {
70 this();
71 this.messageId = messageId;
72 this.mode = mode;
73 this.user = user;
74 this.cause = cause;
75 }
76
77
78 public MessageProcessingJob() {
79 dao = GlobalKCBServiceLocator.getInstance().getKcbGenericDao();
80 registry = GlobalKCBServiceLocator.getInstance().getMessageDelivererRegistryService();
81 messageDeliveryService = GlobalKCBServiceLocator.getInstance().getMessageDeliveryService();
82 txManager = GlobalKCBServiceLocator.getInstance().getTransactionManager();
83 }
84
85
86
87
88
89 @Required
90 public void setGenericDao(GenericDao dao) {
91 this.dao = dao;
92 }
93
94
95
96
97
98 @Required
99 public void setMessageDelivererRegistry(MessageDelivererRegistryService registry) {
100 this.registry = registry;
101 }
102
103
104
105
106
107 @Required
108 public void setMessageDeliveryService(MessageDeliveryService messageDeliveryService) {
109 this.messageDeliveryService = messageDeliveryService;
110 }
111
112 @Override
113 protected Collection<MessageDelivery> takeAvailableWorkItems() {
114 MessageDeliveryStatus[] statuses;
115 switch (mode) {
116 case DELIVER: {
117 statuses = new MessageDeliveryStatus[] { MessageDeliveryStatus.UNDELIVERED };
118 break;
119 }
120 case REMOVE: {
121 if (messageId == null) {
122 throw new IllegalStateException("Message id must be specified for message removal mode");
123 }
124 statuses = new MessageDeliveryStatus[] { MessageDeliveryStatus.DELIVERED, MessageDeliveryStatus.UNDELIVERED };
125 break;
126 }
127 default:
128 throw new RuntimeException("Invalid mode: " + mode);
129 }
130 for (MessageDeliveryStatus status: statuses) {
131 LOG.debug("Taking message deliveries with status: " + status);
132 }
133 Collection<MessageDelivery> ds = messageDeliveryService.lockAndTakeMessageDeliveries(messageId, statuses);
134 LOG.debug("Took " + ds.size() + " deliveries");
135 for (MessageDelivery md: ds) {
136 LOG.debug(md);
137 md.setProcessCount(md.getProcessCount().intValue() + 1);
138 }
139 return ds;
140 }
141
142 @Override
143 protected void unlockWorkItem(MessageDelivery item) {
144 item.setLockedDate(null);
145 dao.save(item);
146 }
147
148
149
150
151
152
153 @Override
154 protected Collection<Collection<MessageDelivery>> groupWorkItems(Collection<MessageDelivery> workItems, ProcessingResult<MessageDelivery> result) {
155 Collection<Collection<MessageDelivery>> groupedWorkItems = new ArrayList<Collection<MessageDelivery>>(workItems.size());
156
157 Map<String, Collection<MessageDelivery>> bulkWorkUnits = new HashMap<String, Collection<MessageDelivery>>();
158 for (MessageDelivery messageDelivery: workItems) {
159
160 MessageDeliverer deliverer = registry.getDeliverer(messageDelivery);
161 if (deliverer == null) {
162 LOG.error("Error obtaining message deliverer for message delivery: " + messageDelivery);
163 result.addFailure(new Failure<MessageDelivery>(messageDelivery, "Error obtaining message deliverer for message delivery"));
164 unlockWorkItemAtomically(messageDelivery);
165 continue;
166 }
167
168 if (deliverer instanceof BulkMessageDeliverer) {
169
170 String key = messageDelivery.getDelivererTypeName() + ":" + messageDelivery.getMessage().getId();
171 Collection<MessageDelivery> workUnit = bulkWorkUnits.get(key);
172 if (workUnit == null) {
173 workUnit = new LinkedList<MessageDelivery>();
174 bulkWorkUnits.put(key, workUnit);
175 }
176 workUnit.add(messageDelivery);
177 } else {
178 ArrayList<MessageDelivery> l = new ArrayList<MessageDelivery>(1);
179 l.add(messageDelivery);
180 groupedWorkItems.add(l);
181 }
182 }
183
184 return groupedWorkItems;
185 }
186
187
188 @Override
189 protected Collection<MessageDelivery> processWorkItems(Collection<MessageDelivery> messageDeliveries) {
190 MessageDelivery firstMessageDelivery = messageDeliveries.iterator().next();
191
192 MessageDeliverer messageDeliverer = registry.getDeliverer(firstMessageDelivery);
193 if (messageDeliverer == null) {
194 throw new RuntimeException("Message deliverer could not be obtained");
195 }
196
197 if (messageDeliveries.size() > 1) {
198
199 if (!(messageDeliverer instanceof BulkMessageDeliverer)) {
200 throw new RuntimeException("Discrepency in dispatch service: deliverer for list of message deliveries is not a BulkMessageDeliverer");
201 }
202 return bulkProcess((BulkMessageDeliverer) messageDeliverer, messageDeliveries, mode);
203 } else {
204 return process(messageDeliverer, firstMessageDelivery, mode);
205 }
206 }
207
208
209
210
211
212
213
214 protected Collection<MessageDelivery> process(MessageDeliverer messageDeliverer, MessageDelivery messageDelivery, Mode mode) {
215
216 try {
217 if (mode == Mode.DELIVER) {
218 messageDeliverer.deliver(messageDelivery);
219
220 messageDelivery.setProcessCount(Integer.valueOf(0));
221
222
223
224 updateStatusAndUnlock(messageDelivery, mode == Mode.DELIVER ? MessageDeliveryStatus.DELIVERED : MessageDeliveryStatus.REMOVED);
225 } else {
226 messageDeliverer.dismiss(messageDelivery, user, cause);
227
228 messageDeliveryService.deleteMessageDelivery(messageDelivery);
229 }
230 } catch (MessageDeliveryProcessingException nmde) {
231 LOG.error("Error processing message delivery " + messageDelivery, nmde);
232 throw new RuntimeException(nmde);
233 }
234
235 LOG.debug("Message delivery '" + messageDelivery.getId() + "' for message '" + messageDelivery.getMessage().getId() + "' was successfully processed.");
236
237
238 List<MessageDelivery> success = new ArrayList<MessageDelivery>(1);
239 success.add(messageDelivery);
240 return success;
241 }
242
243
244
245
246
247
248
249 protected Collection<MessageDelivery> bulkProcess(BulkMessageDeliverer messageDeliverer, Collection<MessageDelivery> messageDeliveries, Mode mode) {
250 MessageDeliveryStatus targetStatus = (mode == Mode.DELIVER ? MessageDeliveryStatus.DELIVERED : MessageDeliveryStatus.REMOVED);
251
252 try {
253 if (mode == Mode.DELIVER) {
254 messageDeliverer.bulkDeliver(messageDeliveries);
255 } else {
256 messageDeliverer.bulkDismiss(messageDeliveries);
257 }
258 } catch (MessageDeliveryProcessingException nmde) {
259 LOG.error("Error bulk-delivering messages " + messageDeliveries, nmde);
260 throw new RuntimeException(nmde);
261 }
262
263
264
265
266 List<MessageDelivery> successes = new ArrayList<MessageDelivery>(messageDeliveries.size());
267 for (MessageDelivery nmd: messageDeliveries) {
268 successes.add(nmd);
269 LOG.debug("Message delivery '" + nmd.getId() + "' for notification '" + nmd.getMessage().getId() + "' was successfully delivered.");
270
271 if (mode == Mode.REMOVE) {
272 messageDeliveryService.deleteMessageDelivery(nmd);
273 } else {
274 nmd.setProcessCount(0);
275 updateStatusAndUnlock(nmd, targetStatus);
276 }
277 }
278
279 return successes;
280 }
281
282 @Override
283 protected void finishProcessing(ProcessingResult<MessageDelivery> result) {
284 LOG.debug("Message processing job: " + result.getSuccesses().size() + " processed, " + result.getFailures().size() + " failures");
285 Set<Long> messageIds = new HashSet<Long>(result.getSuccesses().size());
286 for (MessageDelivery md: result.getSuccesses()) {
287 messageIds.add(md.getMessage().getId());
288 }
289 MessageService ms = GlobalKCBServiceLocator.getInstance().getMessageService();
290 for (Long id: messageIds) {
291 LOG.debug("Finishing processing message " + id);
292
293
294 Message m = ms.getMessage(id);
295
296 Collection<MessageDelivery> c = messageDeliveryService.getMessageDeliveries(m);
297 if (c.size() == 0) {
298 LOG.debug("Deleting message " + m);
299 ms.deleteMessage(m);
300 } else {
301 LOG.debug("Message " + m.getId() + " has " + c.size() + " deliveries");
302 for (MessageDelivery md: c) {
303 LOG.debug(md);
304 }
305 }
306 }
307 }
308
309
310
311
312
313 protected void updateStatusAndUnlock(MessageDelivery messageDelivery, MessageDeliveryStatus status) {
314 messageDelivery.setDeliveryStatus(status);
315
316 messageDelivery.setLockedDate(null);
317 dao.save(messageDelivery);
318 }
319
320 @Override
321 public ProcessingResult<MessageDelivery> run() {
322 LOG.debug("MessageProcessingJob running in Thread " + Thread.currentThread() + ": " + mode + " " + user + " " + cause);
323 return super.run();
324 }
325
326 public void execute(JobExecutionContext context) throws JobExecutionException {
327 String mode = context.getMergedJobDataMap().getString("mode");
328 if (mode != null) {
329 this.mode = Mode.valueOf(mode);
330 } else {
331 this.mode = Mode.DELIVER;
332 }
333 this.user = context.getMergedJobDataMap().getString("user");
334 this.cause = context.getMergedJobDataMap().getString("cause");
335 if (context.getMergedJobDataMap().containsKey("messageId")) {
336 this.messageId = context.getMergedJobDataMap().getLong("messageId");
337 }
338 LOG.debug("==== message processing job: " + this.mode + " message id: " + this.messageId + "====");
339 super.run();
340 }
341 }