001 /** 002 * Copyright 2005-2011 The Kuali Foundation 003 * 004 * Licensed under the Educational Community License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl2.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 package org.kuali.rice.kcb.quartz; 017 018 import org.apache.log4j.Logger; 019 import org.kuali.rice.core.framework.persistence.dao.GenericDao; 020 import org.kuali.rice.kcb.bo.Message; 021 import org.kuali.rice.kcb.bo.MessageDelivery; 022 import org.kuali.rice.kcb.bo.MessageDeliveryStatus; 023 import org.kuali.rice.kcb.deliverer.BulkMessageDeliverer; 024 import org.kuali.rice.kcb.deliverer.MessageDeliverer; 025 import org.kuali.rice.kcb.api.exception.MessageDeliveryProcessingException; 026 import org.kuali.rice.kcb.quartz.ProcessingResult.Failure; 027 import org.kuali.rice.kcb.service.GlobalKCBServiceLocator; 028 import org.kuali.rice.kcb.service.MessageDelivererRegistryService; 029 import org.kuali.rice.kcb.service.MessageDeliveryService; 030 import org.kuali.rice.kcb.service.MessageService; 031 import org.quartz.JobExecutionContext; 032 import org.quartz.JobExecutionException; 033 import org.quartz.StatefulJob; 034 import org.springframework.beans.factory.annotation.Required; 035 036 import java.util.ArrayList; 037 import java.util.Collection; 038 import java.util.HashMap; 039 import java.util.HashSet; 040 import java.util.LinkedList; 041 import java.util.List; 042 import java.util.Map; 043 import java.util.Set; 044 045 /** 046 * Job that delivers messages to endpoints. This job is not really stateful, 047 * but should not be executed concurrently. 048 * 049 * @author Kuali Rice Team (rice.collab@kuali.org) 050 */ 051 public class MessageProcessingJob extends ConcurrentJob<MessageDelivery> implements StatefulJob { 052 public static final String NAME = "MessageProcessingJobDetail"; 053 public static final String GROUP = "KCB-Delivery"; 054 055 public static enum Mode { 056 DELIVER, REMOVE 057 } 058 059 private static final Logger LOG = Logger.getLogger(MessageProcessingJob.class); 060 061 private GenericDao dao; 062 private MessageDelivererRegistryService registry; 063 private MessageDeliveryService messageDeliveryService; 064 private Long messageId; 065 private Mode mode = null; 066 private String user; 067 private String cause; 068 069 public MessageProcessingJob(Long messageId, Mode mode, String user, String cause) { 070 this(); 071 this.messageId = messageId; 072 this.mode = mode; 073 this.user = user; 074 this.cause = cause; 075 } 076 077 078 public MessageProcessingJob() { 079 dao = GlobalKCBServiceLocator.getInstance().getKcbGenericDao(); 080 registry = GlobalKCBServiceLocator.getInstance().getMessageDelivererRegistryService(); 081 messageDeliveryService = GlobalKCBServiceLocator.getInstance().getMessageDeliveryService(); 082 txManager = GlobalKCBServiceLocator.getInstance().getTransactionManager(); 083 } 084 085 /** 086 * Sets the {@link GenericDao} 087 * @param dao the {@link GenericDao} 088 */ 089 @Required 090 public void setGenericDao(GenericDao dao) { 091 this.dao = dao; 092 } 093 094 /** 095 * Sets the {@link MessageDelivererRegistryService} 096 * @param registry the {@link MessageDelivererRegistryService} 097 */ 098 @Required 099 public void setMessageDelivererRegistry(MessageDelivererRegistryService registry) { 100 this.registry = registry; 101 } 102 103 /** 104 * Sets the {@link MessageDeliveryService} 105 * @param messageDeliveryService the {@link MessageDeliveryService} 106 */ 107 @Required 108 public void setMessageDeliveryService(MessageDeliveryService messageDeliveryService) { 109 this.messageDeliveryService = messageDeliveryService; 110 } 111 112 @Override 113 protected Collection<MessageDelivery> takeAvailableWorkItems() { 114 MessageDeliveryStatus[] statuses; 115 switch (mode) { 116 case DELIVER: { 117 statuses = new MessageDeliveryStatus[] { MessageDeliveryStatus.UNDELIVERED }; 118 break; 119 } 120 case REMOVE: { 121 if (messageId == null) { 122 throw new IllegalStateException("Message id must be specified for message removal mode"); 123 } 124 statuses = new MessageDeliveryStatus[] { MessageDeliveryStatus.DELIVERED, MessageDeliveryStatus.UNDELIVERED }; 125 break; 126 } 127 default: 128 throw new RuntimeException("Invalid mode: " + mode); 129 } 130 for (MessageDeliveryStatus status: statuses) { 131 LOG.debug("Taking message deliveries with status: " + status); 132 } 133 Collection<MessageDelivery> ds = messageDeliveryService.lockAndTakeMessageDeliveries(messageId, statuses); 134 LOG.debug("Took " + ds.size() + " deliveries"); 135 for (MessageDelivery md: ds) { 136 LOG.debug(md); 137 md.setProcessCount(md.getProcessCount().intValue() + 1); 138 } 139 return ds; 140 } 141 142 @Override 143 protected void unlockWorkItem(MessageDelivery item) { 144 item.setLockedDate(null); 145 dao.save(item); 146 } 147 148 /** 149 * Group work items by deliverer and notification, so that deliveries to bulk deliverers are grouped 150 * by notification 151 * @see org.kuali.rice.ken.service.impl.ConcurrentJob#groupWorkItems(java.util.Collection) 152 */ 153 @Override 154 protected Collection<Collection<MessageDelivery>> groupWorkItems(Collection<MessageDelivery> workItems, ProcessingResult<MessageDelivery> result) { 155 Collection<Collection<MessageDelivery>> groupedWorkItems = new ArrayList<Collection<MessageDelivery>>(workItems.size()); 156 157 Map<String, Collection<MessageDelivery>> bulkWorkUnits = new HashMap<String, Collection<MessageDelivery>>(); 158 for (MessageDelivery messageDelivery: workItems) { 159 160 MessageDeliverer deliverer = registry.getDeliverer(messageDelivery); 161 if (deliverer == null) { 162 LOG.error("Error obtaining message deliverer for message delivery: " + messageDelivery); 163 result.addFailure(new Failure<MessageDelivery>(messageDelivery, "Error obtaining message deliverer for message delivery")); 164 unlockWorkItemAtomically(messageDelivery); 165 continue; 166 } 167 168 if (deliverer instanceof BulkMessageDeliverer) { 169 // group by bulk-deliverer+message combo 170 String key = messageDelivery.getDelivererTypeName() + ":" + messageDelivery.getMessage().getId(); 171 Collection<MessageDelivery> workUnit = bulkWorkUnits.get(key); 172 if (workUnit == null) { 173 workUnit = new LinkedList<MessageDelivery>(); 174 bulkWorkUnits.put(key, workUnit); 175 } 176 workUnit.add(messageDelivery); 177 } else { 178 ArrayList<MessageDelivery> l = new ArrayList<MessageDelivery>(1); 179 l.add(messageDelivery); 180 groupedWorkItems.add(l); 181 } 182 } 183 184 return groupedWorkItems; 185 } 186 187 188 @Override 189 protected Collection<MessageDelivery> processWorkItems(Collection<MessageDelivery> messageDeliveries) { 190 MessageDelivery firstMessageDelivery = messageDeliveries.iterator().next(); 191 // get our hands on the appropriate MessageDeliverer instance 192 MessageDeliverer messageDeliverer = registry.getDeliverer(firstMessageDelivery); 193 if (messageDeliverer == null) { 194 throw new RuntimeException("Message deliverer could not be obtained"); 195 } 196 197 if (messageDeliveries.size() > 1) { 198 // this is a bulk deliverer, so we need to batch the MessageDeliveries 199 if (!(messageDeliverer instanceof BulkMessageDeliverer)) { 200 throw new RuntimeException("Discrepency in dispatch service: deliverer for list of message deliveries is not a BulkMessageDeliverer"); 201 } 202 return bulkProcess((BulkMessageDeliverer) messageDeliverer, messageDeliveries, mode); 203 } else { 204 return process(messageDeliverer, firstMessageDelivery, mode); 205 } 206 } 207 208 /** 209 * Implements delivery of a single MessageDelivery 210 * @param deliverer the deliverer 211 * @param messageDelivery the delivery 212 * @return collection of strings indicating successful deliveries 213 */ 214 protected Collection<MessageDelivery> process(MessageDeliverer messageDeliverer, MessageDelivery messageDelivery, Mode mode) { 215 // we have our message deliverer, so tell it to deliver the message 216 try { 217 if (mode == Mode.DELIVER) { 218 messageDeliverer.deliver(messageDelivery); 219 // if processing was successful, set the count back to zero 220 messageDelivery.setProcessCount(Integer.valueOf(0)); 221 // by definition we have succeeded at this point if no exception was thrown by the messageDeliverer 222 // so update the status of the delivery message instance to DELIVERED (and unmark as taken) 223 // and persist 224 updateStatusAndUnlock(messageDelivery, mode == Mode.DELIVER ? MessageDeliveryStatus.DELIVERED : MessageDeliveryStatus.REMOVED); 225 } else { 226 messageDeliverer.dismiss(messageDelivery, user, cause); 227 // don't need to set the processing count down to zero because we are just deleting the record entirely 228 messageDeliveryService.deleteMessageDelivery(messageDelivery); 229 } 230 } catch (MessageDeliveryProcessingException nmde) { 231 LOG.error("Error processing message delivery " + messageDelivery, nmde); 232 throw new RuntimeException(nmde); 233 } 234 235 LOG.debug("Message delivery '" + messageDelivery.getId() + "' for message '" + messageDelivery.getMessage().getId() + "' was successfully processed."); 236 //PerformanceLog.logDuration("Time to dispatch notification delivery for notification " + messageDelivery.getMessage().getId(), System.currentTimeMillis() - messageDelivery.getNotification().getSendDateTime().getTime()); 237 238 List<MessageDelivery> success = new ArrayList<MessageDelivery>(1); 239 success.add(messageDelivery); 240 return success; 241 } 242 243 /** 244 * Implements bulk delivery of a collection of {@link MessageDelivery}s 245 * @param deliverer the deliverer 246 * @param messageDeliveries the deliveries 247 * @return collection of strings indicating successful deliveries 248 */ 249 protected Collection<MessageDelivery> bulkProcess(BulkMessageDeliverer messageDeliverer, Collection<MessageDelivery> messageDeliveries, Mode mode) { 250 MessageDeliveryStatus targetStatus = (mode == Mode.DELIVER ? MessageDeliveryStatus.DELIVERED : MessageDeliveryStatus.REMOVED); 251 // we have our message deliverer, so tell it to deliver the message 252 try { 253 if (mode == Mode.DELIVER) { 254 messageDeliverer.bulkDeliver(messageDeliveries); 255 } else { 256 messageDeliverer.bulkDismiss(messageDeliveries); 257 } 258 } catch (MessageDeliveryProcessingException nmde) { 259 LOG.error("Error bulk-delivering messages " + messageDeliveries, nmde); 260 throw new RuntimeException(nmde); 261 } 262 263 // by definition we have succeeded at this point if no exception was thrown by the messageDeliverer 264 // so update the status of the delivery message instance to DELIVERED (and unmark as taken) 265 // and persist 266 List<MessageDelivery> successes = new ArrayList<MessageDelivery>(messageDeliveries.size()); 267 for (MessageDelivery nmd: messageDeliveries) { 268 successes.add(nmd); 269 LOG.debug("Message delivery '" + nmd.getId() + "' for notification '" + nmd.getMessage().getId() + "' was successfully delivered."); 270 //PerformanceLog.logDuration("Time to dispatch notification delivery for notification " + nmd.getMessage().getId(), System.currentTimeMillis() - nmd.getNotification().getSendDateTime().getTime()); 271 if (mode == Mode.REMOVE) { 272 messageDeliveryService.deleteMessageDelivery(nmd); 273 } else { 274 nmd.setProcessCount(0); 275 updateStatusAndUnlock(nmd, targetStatus); 276 } 277 } 278 279 return successes; 280 } 281 282 @Override 283 protected void finishProcessing(ProcessingResult<MessageDelivery> result) { 284 LOG.debug("Message processing job: " + result.getSuccesses().size() + " processed, " + result.getFailures().size() + " failures"); 285 Set<Long> messageIds = new HashSet<Long>(result.getSuccesses().size()); 286 for (MessageDelivery md: result.getSuccesses()) { 287 messageIds.add(md.getMessage().getId()); 288 } 289 MessageService ms = GlobalKCBServiceLocator.getInstance().getMessageService(); 290 for (Long id: messageIds) { 291 LOG.debug("Finishing processing message " + id); 292 //if (Mode.REMOVE == mode) { 293 294 Message m = ms.getMessage(id); 295 296 Collection<MessageDelivery> c = messageDeliveryService.getMessageDeliveries(m); 297 if (c.size() == 0) { 298 LOG.debug("Deleting message " + m); 299 ms.deleteMessage(m); 300 } else { 301 LOG.debug("Message " + m.getId() + " has " + c.size() + " deliveries"); 302 for (MessageDelivery md: c) { 303 LOG.debug(md); 304 } 305 } 306 } 307 } 308 309 /** 310 * Marks a MessageDelivery as having been delivered, and unlocks it 311 * @param messageDelivery the messageDelivery instance to mark 312 */ 313 protected void updateStatusAndUnlock(MessageDelivery messageDelivery, MessageDeliveryStatus status) { 314 messageDelivery.setDeliveryStatus(status); 315 // mark as unlocked 316 messageDelivery.setLockedDate(null); 317 dao.save(messageDelivery); 318 } 319 320 @Override 321 public ProcessingResult<MessageDelivery> run() { 322 LOG.debug("MessageProcessingJob running in Thread " + Thread.currentThread() + ": " + mode + " " + user + " " + cause); 323 return super.run(); 324 } 325 326 public void execute(JobExecutionContext context) throws JobExecutionException { 327 String mode = context.getMergedJobDataMap().getString("mode"); 328 if (mode != null) { 329 this.mode = Mode.valueOf(mode); 330 } else { 331 this.mode = Mode.DELIVER; 332 } 333 this.user = context.getMergedJobDataMap().getString("user"); 334 this.cause = context.getMergedJobDataMap().getString("cause"); 335 if (context.getMergedJobDataMap().containsKey("messageId")) { 336 this.messageId = context.getMergedJobDataMap().getLong("messageId"); 337 } 338 LOG.debug("==== message processing job: " + this.mode + " message id: " + this.messageId + "===="); 339 super.run(); 340 } 341 }