001 /** 002 * Copyright 2005-2014 The Kuali Foundation 003 * 004 * Licensed under the Educational Community License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl2.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 package org.kuali.rice.kcb.quartz; 017 018 import org.apache.log4j.Logger; 019 import org.kuali.rice.core.framework.persistence.dao.GenericDao; 020 import org.kuali.rice.kcb.bo.Message; 021 import org.kuali.rice.kcb.bo.MessageDelivery; 022 import org.kuali.rice.kcb.bo.MessageDeliveryStatus; 023 import org.kuali.rice.kcb.deliverer.BulkMessageDeliverer; 024 import org.kuali.rice.kcb.deliverer.MessageDeliverer; 025 import org.kuali.rice.kcb.api.exception.MessageDeliveryProcessingException; 026 import org.kuali.rice.kcb.quartz.ProcessingResult.Failure; 027 import org.kuali.rice.kcb.service.GlobalKCBServiceLocator; 028 import org.kuali.rice.kcb.service.MessageDelivererRegistryService; 029 import org.kuali.rice.kcb.service.MessageDeliveryService; 030 import org.kuali.rice.kcb.service.MessageService; 031 import org.kuali.rice.krad.data.DataObjectService; 032 import org.kuali.rice.krad.service.KRADServiceLocator; 033 import org.quartz.JobExecutionContext; 034 import org.quartz.JobExecutionException; 035 import org.quartz.StatefulJob; 036 import org.springframework.beans.factory.annotation.Required; 037 038 import java.util.ArrayList; 039 import java.util.Collection; 040 import java.util.HashMap; 041 import java.util.HashSet; 042 import java.util.LinkedList; 043 import java.util.List; 044 import java.util.Map; 045 import java.util.Set; 046 047 /** 048 * Job that delivers messages to endpoints. This job is not really stateful, 049 * but should not be executed concurrently. 050 * 051 * @author Kuali Rice Team (rice.collab@kuali.org) 052 */ 053 public class MessageProcessingJob extends ConcurrentJob<MessageDelivery> implements StatefulJob { 054 public static final String NAME = "MessageProcessingJobDetail"; 055 public static final String GROUP = "KCB-Delivery"; 056 057 public static enum Mode { 058 DELIVER, REMOVE 059 } 060 061 private static final Logger LOG = Logger.getLogger(MessageProcessingJob.class); 062 063 private DataObjectService dataObjectService; 064 private MessageDelivererRegistryService registry; 065 private MessageDeliveryService messageDeliveryService; 066 private Long messageId; 067 private Mode mode = null; 068 private String user; 069 private String cause; 070 071 public MessageProcessingJob(Long messageId, Mode mode, String user, String cause) { 072 this(); 073 this.messageId = messageId; 074 this.mode = mode; 075 this.user = user; 076 this.cause = cause; 077 } 078 079 080 public MessageProcessingJob() { 081 registry = GlobalKCBServiceLocator.getInstance().getMessageDelivererRegistryService(); 082 messageDeliveryService = GlobalKCBServiceLocator.getInstance().getMessageDeliveryService(); 083 txManager = GlobalKCBServiceLocator.getInstance().getTransactionManager(); 084 dataObjectService = KRADServiceLocator.getDataObjectService(); 085 } 086 087 /** 088 * Sets the {@link DataObjectService} 089 * @param dataObjectService the {@link DataObjectService} 090 */ 091 public void setDataObjectService(DataObjectService dataObjectService) { 092 this.dataObjectService = dataObjectService; 093 } 094 095 /** 096 * Sets the {@link MessageDelivererRegistryService} 097 * @param registry the {@link MessageDelivererRegistryService} 098 */ 099 @Required 100 public void setMessageDelivererRegistry(MessageDelivererRegistryService registry) { 101 this.registry = registry; 102 } 103 104 /** 105 * Sets the {@link MessageDeliveryService} 106 * @param messageDeliveryService the {@link MessageDeliveryService} 107 */ 108 @Required 109 public void setMessageDeliveryService(MessageDeliveryService messageDeliveryService) { 110 this.messageDeliveryService = messageDeliveryService; 111 } 112 113 @Override 114 protected Collection<MessageDelivery> takeAvailableWorkItems() { 115 MessageDeliveryStatus[] statuses; 116 switch (mode) { 117 case DELIVER: { 118 statuses = new MessageDeliveryStatus[] { MessageDeliveryStatus.UNDELIVERED }; 119 break; 120 } 121 case REMOVE: { 122 if (messageId == null) { 123 throw new IllegalStateException("Message id must be specified for message removal mode"); 124 } 125 statuses = new MessageDeliveryStatus[] { MessageDeliveryStatus.DELIVERED, MessageDeliveryStatus.UNDELIVERED }; 126 break; 127 } 128 default: 129 throw new RuntimeException("Invalid mode: " + mode); 130 } 131 for (MessageDeliveryStatus status: statuses) { 132 LOG.debug("Taking message deliveries with status: " + status); 133 } 134 Collection<MessageDelivery> ds = messageDeliveryService.lockAndTakeMessageDeliveries(messageId, statuses); 135 LOG.debug("Took " + ds.size() + " deliveries"); 136 for (MessageDelivery md: ds) { 137 LOG.debug(md); 138 md.setProcessCount(md.getProcessCount().intValue() + 1); 139 } 140 return ds; 141 } 142 143 @Override 144 protected void unlockWorkItem(MessageDelivery item) { 145 item.setLockedDate(null); 146 dataObjectService.save(item); 147 } 148 149 /** 150 * Group work items by deliverer and notification, so that deliveries to bulk deliverers are grouped 151 * by notification 152 * @see org.kuali.rice.ken.service.impl.ConcurrentJob#groupWorkItems(java.util.Collection) 153 */ 154 @Override 155 protected Collection<Collection<MessageDelivery>> groupWorkItems(Collection<MessageDelivery> workItems, ProcessingResult<MessageDelivery> result) { 156 Collection<Collection<MessageDelivery>> groupedWorkItems = new ArrayList<Collection<MessageDelivery>>(workItems.size()); 157 158 Map<String, Collection<MessageDelivery>> bulkWorkUnits = new HashMap<String, Collection<MessageDelivery>>(); 159 for (MessageDelivery messageDelivery: workItems) { 160 161 MessageDeliverer deliverer = registry.getDeliverer(messageDelivery); 162 if (deliverer == null) { 163 LOG.error("Error obtaining message deliverer for message delivery: " + messageDelivery); 164 result.addFailure(new Failure<MessageDelivery>(messageDelivery, "Error obtaining message deliverer for message delivery")); 165 unlockWorkItemAtomically(messageDelivery); 166 continue; 167 } 168 169 if (deliverer instanceof BulkMessageDeliverer) { 170 // group by bulk-deliverer+message combo 171 String key = messageDelivery.getDelivererTypeName() + ":" + messageDelivery.getMessage().getId(); 172 Collection<MessageDelivery> workUnit = bulkWorkUnits.get(key); 173 if (workUnit == null) { 174 workUnit = new LinkedList<MessageDelivery>(); 175 bulkWorkUnits.put(key, workUnit); 176 } 177 workUnit.add(messageDelivery); 178 } else { 179 ArrayList<MessageDelivery> l = new ArrayList<MessageDelivery>(1); 180 l.add(messageDelivery); 181 groupedWorkItems.add(l); 182 } 183 } 184 185 return groupedWorkItems; 186 } 187 188 189 @Override 190 protected Collection<MessageDelivery> processWorkItems(Collection<MessageDelivery> messageDeliveries) { 191 MessageDelivery firstMessageDelivery = messageDeliveries.iterator().next(); 192 // get our hands on the appropriate MessageDeliverer instance 193 MessageDeliverer messageDeliverer = registry.getDeliverer(firstMessageDelivery); 194 if (messageDeliverer == null) { 195 throw new RuntimeException("Message deliverer could not be obtained"); 196 } 197 198 if (messageDeliveries.size() > 1) { 199 // this is a bulk deliverer, so we need to batch the MessageDeliveries 200 if (!(messageDeliverer instanceof BulkMessageDeliverer)) { 201 throw new RuntimeException("Discrepency in dispatch service: deliverer for list of message deliveries is not a BulkMessageDeliverer"); 202 } 203 return bulkProcess((BulkMessageDeliverer) messageDeliverer, messageDeliveries, mode); 204 } else { 205 return process(messageDeliverer, firstMessageDelivery, mode); 206 } 207 } 208 209 /** 210 * Implements delivery of a single MessageDelivery 211 * @param messageDeliverer the deliverer 212 * @param messageDelivery the delivery 213 * @return collection of strings indicating successful deliveries 214 */ 215 protected Collection<MessageDelivery> process(MessageDeliverer messageDeliverer, MessageDelivery messageDelivery, Mode mode) { 216 // we have our message deliverer, so tell it to deliver the message 217 try { 218 if (mode == Mode.DELIVER) { 219 messageDeliverer.deliver(messageDelivery); 220 // if processing was successful, set the count back to zero 221 messageDelivery.setProcessCount(Integer.valueOf(0)); 222 // by definition we have succeeded at this point if no exception was thrown by the messageDeliverer 223 // so update the status of the delivery message instance to DELIVERED (and unmark as taken) 224 // and persist 225 updateStatusAndUnlock(messageDelivery, mode == Mode.DELIVER ? MessageDeliveryStatus.DELIVERED : MessageDeliveryStatus.REMOVED); 226 } else { 227 messageDeliverer.dismiss(messageDelivery, user, cause); 228 // don't need to set the processing count down to zero because we are just deleting the record entirely 229 messageDeliveryService.deleteMessageDelivery(messageDelivery); 230 } 231 } catch (MessageDeliveryProcessingException nmde) { 232 LOG.error("Error processing message delivery " + messageDelivery, nmde); 233 throw new RuntimeException(nmde); 234 } 235 236 LOG.debug("Message delivery '" + messageDelivery.getId() + "' for message '" + messageDelivery.getMessage().getId() + "' was successfully processed."); 237 //PerformanceLog.logDuration("Time to dispatch notification delivery for notification " + messageDelivery.getMessage().getId(), System.currentTimeMillis() - messageDelivery.getNotification().getSendDateTime().getTime()); 238 239 List<MessageDelivery> success = new ArrayList<MessageDelivery>(1); 240 success.add(messageDelivery); 241 return success; 242 } 243 244 /** 245 * Implements bulk delivery of a collection of {@link MessageDelivery}s 246 * @param messageDeliverer the deliverer 247 * @param messageDeliveries the deliveries 248 * @return collection of strings indicating successful deliveries 249 */ 250 protected Collection<MessageDelivery> bulkProcess(BulkMessageDeliverer messageDeliverer, Collection<MessageDelivery> messageDeliveries, Mode mode) { 251 MessageDeliveryStatus targetStatus = (mode == Mode.DELIVER ? MessageDeliveryStatus.DELIVERED : MessageDeliveryStatus.REMOVED); 252 // we have our message deliverer, so tell it to deliver the message 253 try { 254 if (mode == Mode.DELIVER) { 255 messageDeliverer.bulkDeliver(messageDeliveries); 256 } else { 257 messageDeliverer.bulkDismiss(messageDeliveries); 258 } 259 } catch (MessageDeliveryProcessingException nmde) { 260 LOG.error("Error bulk-delivering messages " + messageDeliveries, nmde); 261 throw new RuntimeException(nmde); 262 } 263 264 // by definition we have succeeded at this point if no exception was thrown by the messageDeliverer 265 // so update the status of the delivery message instance to DELIVERED (and unmark as taken) 266 // and persist 267 List<MessageDelivery> successes = new ArrayList<MessageDelivery>(messageDeliveries.size()); 268 for (MessageDelivery nmd: messageDeliveries) { 269 successes.add(nmd); 270 LOG.debug("Message delivery '" + nmd.getId() + "' for notification '" + nmd.getMessage().getId() + "' was successfully delivered."); 271 //PerformanceLog.logDuration("Time to dispatch notification delivery for notification " + nmd.getMessage().getId(), System.currentTimeMillis() - nmd.getNotification().getSendDateTime().getTime()); 272 if (mode == Mode.REMOVE) { 273 messageDeliveryService.deleteMessageDelivery(nmd); 274 } else { 275 nmd.setProcessCount(0); 276 updateStatusAndUnlock(nmd, targetStatus); 277 } 278 } 279 280 return successes; 281 } 282 283 @Override 284 protected void finishProcessing(ProcessingResult<MessageDelivery> result) { 285 LOG.debug("Message processing job: " + result.getSuccesses().size() + " processed, " + result.getFailures().size() + " failures"); 286 Set<Long> messageIds = new HashSet<Long>(result.getSuccesses().size()); 287 for (MessageDelivery md: result.getSuccesses()) { 288 messageIds.add(md.getMessage().getId()); 289 } 290 MessageService ms = GlobalKCBServiceLocator.getInstance().getMessageService(); 291 for (Long id: messageIds) { 292 LOG.debug("Finishing processing message " + id); 293 //if (Mode.REMOVE == mode) { 294 295 Message m = ms.getMessage(id); 296 297 Collection<MessageDelivery> c = messageDeliveryService.getMessageDeliveries(m); 298 if (c.size() == 0) { 299 LOG.debug("Deleting message " + m); 300 ms.deleteMessage(m); 301 } else { 302 LOG.debug("Message " + m.getId() + " has " + c.size() + " deliveries"); 303 for (MessageDelivery md: c) { 304 LOG.debug(md); 305 } 306 } 307 } 308 } 309 310 /** 311 * Marks a MessageDelivery as having been delivered, and unlocks it 312 * @param messageDelivery the messageDelivery instance to mark 313 */ 314 protected void updateStatusAndUnlock(MessageDelivery messageDelivery, MessageDeliveryStatus status) { 315 messageDelivery.setDeliveryStatus(status); 316 // mark as unlocked 317 messageDelivery.setLockedDate(null); 318 dataObjectService.save(messageDelivery); 319 } 320 321 @Override 322 public ProcessingResult<MessageDelivery> run() { 323 LOG.debug("MessageProcessingJob running in Thread " + Thread.currentThread() + ": " + mode + " " + user + " " + cause); 324 return super.run(); 325 } 326 327 public void execute(JobExecutionContext context) throws JobExecutionException { 328 String mode = context.getMergedJobDataMap().getString("mode"); 329 if (mode != null) { 330 this.mode = Mode.valueOf(mode); 331 } else { 332 this.mode = Mode.DELIVER; 333 } 334 this.user = context.getMergedJobDataMap().getString("user"); 335 this.cause = context.getMergedJobDataMap().getString("cause"); 336 if (context.getMergedJobDataMap().containsKey("messageId")) { 337 this.messageId = context.getMergedJobDataMap().getLong("messageId"); 338 } 339 LOG.debug("==== message processing job: " + this.mode + " message id: " + this.messageId + "===="); 340 super.run(); 341 } 342 }