001 /**
002 * Copyright 2005-2012 The Kuali Foundation
003 *
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/ecl2.php
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package org.kuali.rice.kcb.quartz;
017
018 import org.apache.log4j.Logger;
019 import org.kuali.rice.core.framework.persistence.dao.GenericDao;
020 import org.kuali.rice.kcb.bo.Message;
021 import org.kuali.rice.kcb.bo.MessageDelivery;
022 import org.kuali.rice.kcb.bo.MessageDeliveryStatus;
023 import org.kuali.rice.kcb.deliverer.BulkMessageDeliverer;
024 import org.kuali.rice.kcb.deliverer.MessageDeliverer;
025 import org.kuali.rice.kcb.api.exception.MessageDeliveryProcessingException;
026 import org.kuali.rice.kcb.quartz.ProcessingResult.Failure;
027 import org.kuali.rice.kcb.service.GlobalKCBServiceLocator;
028 import org.kuali.rice.kcb.service.MessageDelivererRegistryService;
029 import org.kuali.rice.kcb.service.MessageDeliveryService;
030 import org.kuali.rice.kcb.service.MessageService;
031 import org.quartz.JobExecutionContext;
032 import org.quartz.JobExecutionException;
033 import org.quartz.StatefulJob;
034 import org.springframework.beans.factory.annotation.Required;
035
036 import java.util.ArrayList;
037 import java.util.Collection;
038 import java.util.HashMap;
039 import java.util.HashSet;
040 import java.util.LinkedList;
041 import java.util.List;
042 import java.util.Map;
043 import java.util.Set;
044
045 /**
046 * Job that delivers messages to endpoints. This job is not really stateful,
047 * but should not be executed concurrently.
048 *
049 * @author Kuali Rice Team (rice.collab@kuali.org)
050 */
051 public class MessageProcessingJob extends ConcurrentJob<MessageDelivery> implements StatefulJob {
052 public static final String NAME = "MessageProcessingJobDetail";
053 public static final String GROUP = "KCB-Delivery";
054
055 public static enum Mode {
056 DELIVER, REMOVE
057 }
058
059 private static final Logger LOG = Logger.getLogger(MessageProcessingJob.class);
060
061 private GenericDao dao;
062 private MessageDelivererRegistryService registry;
063 private MessageDeliveryService messageDeliveryService;
064 private Long messageId;
065 private Mode mode = null;
066 private String user;
067 private String cause;
068
069 public MessageProcessingJob(Long messageId, Mode mode, String user, String cause) {
070 this();
071 this.messageId = messageId;
072 this.mode = mode;
073 this.user = user;
074 this.cause = cause;
075 }
076
077
078 public MessageProcessingJob() {
079 dao = GlobalKCBServiceLocator.getInstance().getKcbGenericDao();
080 registry = GlobalKCBServiceLocator.getInstance().getMessageDelivererRegistryService();
081 messageDeliveryService = GlobalKCBServiceLocator.getInstance().getMessageDeliveryService();
082 txManager = GlobalKCBServiceLocator.getInstance().getTransactionManager();
083 }
084
085 /**
086 * Sets the {@link GenericDao}
087 * @param dao the {@link GenericDao}
088 */
089 @Required
090 public void setGenericDao(GenericDao dao) {
091 this.dao = dao;
092 }
093
094 /**
095 * Sets the {@link MessageDelivererRegistryService}
096 * @param registry the {@link MessageDelivererRegistryService}
097 */
098 @Required
099 public void setMessageDelivererRegistry(MessageDelivererRegistryService registry) {
100 this.registry = registry;
101 }
102
103 /**
104 * Sets the {@link MessageDeliveryService}
105 * @param messageDeliveryService the {@link MessageDeliveryService}
106 */
107 @Required
108 public void setMessageDeliveryService(MessageDeliveryService messageDeliveryService) {
109 this.messageDeliveryService = messageDeliveryService;
110 }
111
112 @Override
113 protected Collection<MessageDelivery> takeAvailableWorkItems() {
114 MessageDeliveryStatus[] statuses;
115 switch (mode) {
116 case DELIVER: {
117 statuses = new MessageDeliveryStatus[] { MessageDeliveryStatus.UNDELIVERED };
118 break;
119 }
120 case REMOVE: {
121 if (messageId == null) {
122 throw new IllegalStateException("Message id must be specified for message removal mode");
123 }
124 statuses = new MessageDeliveryStatus[] { MessageDeliveryStatus.DELIVERED, MessageDeliveryStatus.UNDELIVERED };
125 break;
126 }
127 default:
128 throw new RuntimeException("Invalid mode: " + mode);
129 }
130 for (MessageDeliveryStatus status: statuses) {
131 LOG.debug("Taking message deliveries with status: " + status);
132 }
133 Collection<MessageDelivery> ds = messageDeliveryService.lockAndTakeMessageDeliveries(messageId, statuses);
134 LOG.debug("Took " + ds.size() + " deliveries");
135 for (MessageDelivery md: ds) {
136 LOG.debug(md);
137 md.setProcessCount(md.getProcessCount().intValue() + 1);
138 }
139 return ds;
140 }
141
142 @Override
143 protected void unlockWorkItem(MessageDelivery item) {
144 item.setLockedDate(null);
145 dao.save(item);
146 }
147
148 /**
149 * Group work items by deliverer and notification, so that deliveries to bulk deliverers are grouped
150 * by notification
151 * @see org.kuali.rice.ken.service.impl.ConcurrentJob#groupWorkItems(java.util.Collection)
152 */
153 @Override
154 protected Collection<Collection<MessageDelivery>> groupWorkItems(Collection<MessageDelivery> workItems, ProcessingResult<MessageDelivery> result) {
155 Collection<Collection<MessageDelivery>> groupedWorkItems = new ArrayList<Collection<MessageDelivery>>(workItems.size());
156
157 Map<String, Collection<MessageDelivery>> bulkWorkUnits = new HashMap<String, Collection<MessageDelivery>>();
158 for (MessageDelivery messageDelivery: workItems) {
159
160 MessageDeliverer deliverer = registry.getDeliverer(messageDelivery);
161 if (deliverer == null) {
162 LOG.error("Error obtaining message deliverer for message delivery: " + messageDelivery);
163 result.addFailure(new Failure<MessageDelivery>(messageDelivery, "Error obtaining message deliverer for message delivery"));
164 unlockWorkItemAtomically(messageDelivery);
165 continue;
166 }
167
168 if (deliverer instanceof BulkMessageDeliverer) {
169 // group by bulk-deliverer+message combo
170 String key = messageDelivery.getDelivererTypeName() + ":" + messageDelivery.getMessage().getId();
171 Collection<MessageDelivery> workUnit = bulkWorkUnits.get(key);
172 if (workUnit == null) {
173 workUnit = new LinkedList<MessageDelivery>();
174 bulkWorkUnits.put(key, workUnit);
175 }
176 workUnit.add(messageDelivery);
177 } else {
178 ArrayList<MessageDelivery> l = new ArrayList<MessageDelivery>(1);
179 l.add(messageDelivery);
180 groupedWorkItems.add(l);
181 }
182 }
183
184 return groupedWorkItems;
185 }
186
187
188 @Override
189 protected Collection<MessageDelivery> processWorkItems(Collection<MessageDelivery> messageDeliveries) {
190 MessageDelivery firstMessageDelivery = messageDeliveries.iterator().next();
191 // get our hands on the appropriate MessageDeliverer instance
192 MessageDeliverer messageDeliverer = registry.getDeliverer(firstMessageDelivery);
193 if (messageDeliverer == null) {
194 throw new RuntimeException("Message deliverer could not be obtained");
195 }
196
197 if (messageDeliveries.size() > 1) {
198 // this is a bulk deliverer, so we need to batch the MessageDeliveries
199 if (!(messageDeliverer instanceof BulkMessageDeliverer)) {
200 throw new RuntimeException("Discrepency in dispatch service: deliverer for list of message deliveries is not a BulkMessageDeliverer");
201 }
202 return bulkProcess((BulkMessageDeliverer) messageDeliverer, messageDeliveries, mode);
203 } else {
204 return process(messageDeliverer, firstMessageDelivery, mode);
205 }
206 }
207
208 /**
209 * Implements delivery of a single MessageDelivery
210 * @param deliverer the deliverer
211 * @param messageDelivery the delivery
212 * @return collection of strings indicating successful deliveries
213 */
214 protected Collection<MessageDelivery> process(MessageDeliverer messageDeliverer, MessageDelivery messageDelivery, Mode mode) {
215 // we have our message deliverer, so tell it to deliver the message
216 try {
217 if (mode == Mode.DELIVER) {
218 messageDeliverer.deliver(messageDelivery);
219 // if processing was successful, set the count back to zero
220 messageDelivery.setProcessCount(Integer.valueOf(0));
221 // by definition we have succeeded at this point if no exception was thrown by the messageDeliverer
222 // so update the status of the delivery message instance to DELIVERED (and unmark as taken)
223 // and persist
224 updateStatusAndUnlock(messageDelivery, mode == Mode.DELIVER ? MessageDeliveryStatus.DELIVERED : MessageDeliveryStatus.REMOVED);
225 } else {
226 messageDeliverer.dismiss(messageDelivery, user, cause);
227 // don't need to set the processing count down to zero because we are just deleting the record entirely
228 messageDeliveryService.deleteMessageDelivery(messageDelivery);
229 }
230 } catch (MessageDeliveryProcessingException nmde) {
231 LOG.error("Error processing message delivery " + messageDelivery, nmde);
232 throw new RuntimeException(nmde);
233 }
234
235 LOG.debug("Message delivery '" + messageDelivery.getId() + "' for message '" + messageDelivery.getMessage().getId() + "' was successfully processed.");
236 //PerformanceLog.logDuration("Time to dispatch notification delivery for notification " + messageDelivery.getMessage().getId(), System.currentTimeMillis() - messageDelivery.getNotification().getSendDateTime().getTime());
237
238 List<MessageDelivery> success = new ArrayList<MessageDelivery>(1);
239 success.add(messageDelivery);
240 return success;
241 }
242
243 /**
244 * Implements bulk delivery of a collection of {@link MessageDelivery}s
245 * @param deliverer the deliverer
246 * @param messageDeliveries the deliveries
247 * @return collection of strings indicating successful deliveries
248 */
249 protected Collection<MessageDelivery> bulkProcess(BulkMessageDeliverer messageDeliverer, Collection<MessageDelivery> messageDeliveries, Mode mode) {
250 MessageDeliveryStatus targetStatus = (mode == Mode.DELIVER ? MessageDeliveryStatus.DELIVERED : MessageDeliveryStatus.REMOVED);
251 // we have our message deliverer, so tell it to deliver the message
252 try {
253 if (mode == Mode.DELIVER) {
254 messageDeliverer.bulkDeliver(messageDeliveries);
255 } else {
256 messageDeliverer.bulkDismiss(messageDeliveries);
257 }
258 } catch (MessageDeliveryProcessingException nmde) {
259 LOG.error("Error bulk-delivering messages " + messageDeliveries, nmde);
260 throw new RuntimeException(nmde);
261 }
262
263 // by definition we have succeeded at this point if no exception was thrown by the messageDeliverer
264 // so update the status of the delivery message instance to DELIVERED (and unmark as taken)
265 // and persist
266 List<MessageDelivery> successes = new ArrayList<MessageDelivery>(messageDeliveries.size());
267 for (MessageDelivery nmd: messageDeliveries) {
268 successes.add(nmd);
269 LOG.debug("Message delivery '" + nmd.getId() + "' for notification '" + nmd.getMessage().getId() + "' was successfully delivered.");
270 //PerformanceLog.logDuration("Time to dispatch notification delivery for notification " + nmd.getMessage().getId(), System.currentTimeMillis() - nmd.getNotification().getSendDateTime().getTime());
271 if (mode == Mode.REMOVE) {
272 messageDeliveryService.deleteMessageDelivery(nmd);
273 } else {
274 nmd.setProcessCount(0);
275 updateStatusAndUnlock(nmd, targetStatus);
276 }
277 }
278
279 return successes;
280 }
281
282 @Override
283 protected void finishProcessing(ProcessingResult<MessageDelivery> result) {
284 LOG.debug("Message processing job: " + result.getSuccesses().size() + " processed, " + result.getFailures().size() + " failures");
285 Set<Long> messageIds = new HashSet<Long>(result.getSuccesses().size());
286 for (MessageDelivery md: result.getSuccesses()) {
287 messageIds.add(md.getMessage().getId());
288 }
289 MessageService ms = GlobalKCBServiceLocator.getInstance().getMessageService();
290 for (Long id: messageIds) {
291 LOG.debug("Finishing processing message " + id);
292 //if (Mode.REMOVE == mode) {
293
294 Message m = ms.getMessage(id);
295
296 Collection<MessageDelivery> c = messageDeliveryService.getMessageDeliveries(m);
297 if (c.size() == 0) {
298 LOG.debug("Deleting message " + m);
299 ms.deleteMessage(m);
300 } else {
301 LOG.debug("Message " + m.getId() + " has " + c.size() + " deliveries");
302 for (MessageDelivery md: c) {
303 LOG.debug(md);
304 }
305 }
306 }
307 }
308
309 /**
310 * Marks a MessageDelivery as having been delivered, and unlocks it
311 * @param messageDelivery the messageDelivery instance to mark
312 */
313 protected void updateStatusAndUnlock(MessageDelivery messageDelivery, MessageDeliveryStatus status) {
314 messageDelivery.setDeliveryStatus(status);
315 // mark as unlocked
316 messageDelivery.setLockedDate(null);
317 dao.save(messageDelivery);
318 }
319
320 @Override
321 public ProcessingResult<MessageDelivery> run() {
322 LOG.debug("MessageProcessingJob running in Thread " + Thread.currentThread() + ": " + mode + " " + user + " " + cause);
323 return super.run();
324 }
325
326 public void execute(JobExecutionContext context) throws JobExecutionException {
327 String mode = context.getMergedJobDataMap().getString("mode");
328 if (mode != null) {
329 this.mode = Mode.valueOf(mode);
330 } else {
331 this.mode = Mode.DELIVER;
332 }
333 this.user = context.getMergedJobDataMap().getString("user");
334 this.cause = context.getMergedJobDataMap().getString("cause");
335 if (context.getMergedJobDataMap().containsKey("messageId")) {
336 this.messageId = context.getMergedJobDataMap().getLong("messageId");
337 }
338 LOG.debug("==== message processing job: " + this.mode + " message id: " + this.messageId + "====");
339 super.run();
340 }
341 }